In [None]:
!pip install pyspark py4j



In [None]:
import os
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType, TimestampType

In [None]:
!pip install findspark



In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

#Loading the Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, inferSchema=True, sep=",")
df.show(5, truncate=False)


+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567    |8                 |3         |6           |
|2024-11-04|1769     |41 |Female|21.02           |2.43          |5.4

# Viewing the Dataframe

In [None]:
df.show(5, truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567    |8                 |3         |6           |
|2024-11-04|1769     |41 |Female|21.02           |2.43          |5.4

In [None]:
df.limit(5)

Date,Person_ID,Age,Gender,Sleep Start Time,Sleep End Time,Total Sleep Hours,Sleep Quality,Exercise (mins/day),Caffeine Intake (mg),Screen Time Before Bed (mins),Work Hours (hrs/day),Productivity Score,Mood Score,Stress Level
2024-04-12,1860,32,Other,23.33,4.61,5.28,3,86,87,116,8.80892009394567,8,3,6
2024-11-04,1769,41,Female,21.02,2.43,5.41,5,32,21,88,6.329833121584335,10,3,7
2024-08-31,2528,20,Male,22.1,3.45,5.35,7,17,88,59,8.506305742764315,10,9,10
2024-02-22,8041,37,Other,23.1,6.65,7.55,8,46,34,80,6.070239852800135,8,4,2
2024-02-23,4843,46,Other,21.42,4.17,6.75,10,61,269,94,11.374993880184936,8,7,9


# Viewing Dataframe Columns

In [None]:
df.columns

['Date',
 'Person_ID',
 'Age',
 'Gender',
 'Sleep Start Time',
 'Sleep End Time',
 'Total Sleep Hours',
 'Sleep Quality',
 'Exercise (mins/day)',
 'Caffeine Intake (mg)',
 'Screen Time Before Bed (mins)',
 'Work Hours (hrs/day)',
 'Productivity Score',
 'Mood Score',
 'Stress Level']

# Dataframe Schema

In [None]:
df.dtypes

[('Date', 'date'),
 ('Person_ID', 'int'),
 ('Age', 'int'),
 ('Gender', 'string'),
 ('Sleep Start Time', 'double'),
 ('Sleep End Time', 'double'),
 ('Total Sleep Hours', 'double'),
 ('Sleep Quality', 'int'),
 ('Exercise (mins/day)', 'int'),
 ('Caffeine Intake (mg)', 'int'),
 ('Screen Time Before Bed (mins)', 'int'),
 ('Work Hours (hrs/day)', 'double'),
 ('Productivity Score', 'int'),
 ('Mood Score', 'int'),
 ('Stress Level', 'int')]

In [None]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Person_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Sleep Start Time: double (nullable = true)
 |-- Sleep End Time: double (nullable = true)
 |-- Total Sleep Hours: double (nullable = true)
 |-- Sleep Quality: integer (nullable = true)
 |-- Exercise (mins/day): integer (nullable = true)
 |-- Caffeine Intake (mg): integer (nullable = true)
 |-- Screen Time Before Bed (mins): integer (nullable = true)
 |-- Work Hours (hrs/day): double (nullable = true)
 |-- Productivity Score: integer (nullable = true)
 |-- Mood Score: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)



# Inferring Schema Implicitly

In [None]:
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",", inferSchema=True)
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Person_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Sleep Start Time: double (nullable = true)
 |-- Sleep End Time: double (nullable = true)
 |-- Total Sleep Hours: double (nullable = true)
 |-- Sleep Quality: integer (nullable = true)
 |-- Exercise (mins/day): integer (nullable = true)
 |-- Caffeine Intake (mg): integer (nullable = true)
 |-- Screen Time Before Bed (mins): integer (nullable = true)
 |-- Work Hours (hrs/day): double (nullable = true)
 |-- Productivity Score: integer (nullable = true)
 |-- Mood Score: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)



# Defining Schema Explicitly

In [None]:
from pyspark.sql.types import *
df.columns

['Date',
 'Person_ID',
 'Age',
 'Gender',
 'Sleep Start Time',
 'Sleep End Time',
 'Total Sleep Hours',
 'Sleep Quality',
 'Exercise (mins/day)',
 'Caffeine Intake (mg)',
 'Screen Time Before Bed (mins)',
 'Work Hours (hrs/day)',
 'Productivity Score',
 'Mood Score',
 'Stress Level']

In [None]:
labels = [
     ('Date',StringType()),
     ('Person_ID',IntegerType()),
     ('Age',IntegerType()),
     ('Gender',StringType()),
     ('Sleep Start Time',DoubleType()),
     ('Sleep End Time',DoubleType()),
     ('Total Sleep Hours',DoubleType()),
     ('Sleep Quality',DoubleType()),
     ('Exercise (mins/day)',DoubleType()),
     ('Caffeine Intake (mg)',DoubleType()),
     ('Screen Time Before Bed (mins)',DoubleType()),
     ('Work Hours (hrs/day)',DoubleType()),
     ('Productivity Score',DoubleType()),
     ('Mood Score',DoubleType()),
     ('Stress Level',DoubleType()),
]

In [None]:
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType([StructField('Date', StringType(), True), StructField('Person_ID', IntegerType(), True), StructField('Age', IntegerType(), True), StructField('Gender', StringType(), True), StructField('Sleep Start Time', DoubleType(), True), StructField('Sleep End Time', DoubleType(), True), StructField('Total Sleep Hours', DoubleType(), True), StructField('Sleep Quality', DoubleType(), True), StructField('Exercise (mins/day)', DoubleType(), True), StructField('Caffeine Intake (mg)', DoubleType(), True), StructField('Screen Time Before Bed (mins)', DoubleType(), True), StructField('Work Hours (hrs/day)', DoubleType(), True), StructField('Productivity Score', DoubleType(), True), StructField('Mood Score', DoubleType(), True), StructField('Stress Level', DoubleType(), True)])

In [None]:
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",", schema=schema)
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Person_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Sleep Start Time: double (nullable = true)
 |-- Sleep End Time: double (nullable = true)
 |-- Total Sleep Hours: double (nullable = true)
 |-- Sleep Quality: double (nullable = true)
 |-- Exercise (mins/day): double (nullable = true)
 |-- Caffeine Intake (mg): double (nullable = true)
 |-- Screen Time Before Bed (mins): double (nullable = true)
 |-- Work Hours (hrs/day): double (nullable = true)
 |-- Productivity Score: double (nullable = true)
 |-- Mood Score: double (nullable = true)
 |-- Stress Level: double (nullable = true)



In [None]:
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",", inferSchema=True)
df.show(truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567    |8                 |3         |6           |
|2024-11-04|1769     |41 |Female|21.02           |2.43          |5.4

# DataFrame Operations on Columns

In [None]:
print(df['Total Sleep Hours'])
print("*"*20)
df.select(df['Total Sleep Hours']).show(truncate=False)

Column<'Total Sleep Hours'>
********************
+-----------------+
|Total Sleep Hours|
+-----------------+
|5.28             |
|5.41             |
|5.35             |
|7.55             |
|6.75             |
|8.64             |
|8.03             |
|6.35             |
|7.31             |
|7.52             |
|6.12             |
|7.96             |
|8.06             |
|7.15             |
|7.67             |
|7.95             |
|7.08             |
|7.71             |
|7.76             |
|7.04             |
+-----------------+
only showing top 20 rows



In [None]:
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('Total Sleep Hours')).show(truncate=False)

+-----------------+
|Total Sleep Hours|
+-----------------+
|5.28             |
|5.41             |
|5.35             |
|7.55             |
|6.75             |
|8.64             |
|8.03             |
|6.35             |
|7.31             |
|7.52             |
|6.12             |
|7.96             |
|8.06             |
|7.15             |
|7.67             |
|7.95             |
|7.08             |
|7.71             |
|7.76             |
|7.04             |
+-----------------+
only showing top 20 rows



### Selecting Multiple Columns

In [None]:
print(df['Total Sleep Hours'], df['Sleep Quality'])
print("*"*40)
df.select(df['Total Sleep Hours'], df['Sleep Quality']).show(truncate=False)

Column<'Total Sleep Hours'> Column<'Sleep Quality'>
****************************************
+-----------------+-------------+
|Total Sleep Hours|Sleep Quality|
+-----------------+-------------+
|5.28             |3            |
|5.41             |5            |
|5.35             |7            |
|7.55             |8            |
|6.75             |10           |
|8.64             |10           |
|8.03             |3            |
|6.35             |8            |
|7.31             |7            |
|7.52             |4            |
|6.12             |7            |
|7.96             |10           |
|8.06             |5            |
|7.15             |10           |
|7.67             |1            |
|7.95             |7            |
|7.08             |1            |
|7.71             |1            |
|7.76             |9            |
|7.04             |6            |
+-----------------+-------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import col
df.select(col('Total Sleep Hours'),col('Sleep Quality')).show(truncate=False)

+-----------------+-------------+
|Total Sleep Hours|Sleep Quality|
+-----------------+-------------+
|5.28             |3            |
|5.41             |5            |
|5.35             |7            |
|7.55             |8            |
|6.75             |10           |
|8.64             |10           |
|8.03             |3            |
|6.35             |8            |
|7.31             |7            |
|7.52             |4            |
|6.12             |7            |
|7.96             |10           |
|8.06             |5            |
|7.15             |10           |
|7.67             |1            |
|7.95             |7            |
|7.08             |1            |
|7.71             |1            |
|7.76             |9            |
|7.04             |6            |
+-----------------+-------------+
only showing top 20 rows



### Adding New Columns

In [None]:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|first_column|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567    |8                 |3         |6           |1           |
|2024-11-04|1769

In [None]:
# CASE 2: Adding multiple columns
# We will add two new columns called 'second_column' and 'third_column' at the end
df = df.withColumn('second_column', lit(2)) \
       .withColumn('third_column', lit('Third Column'))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+-------------+------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|first_column|second_column|third_column|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+-------------+------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394

In [None]:
# CASE 3: Deriving a new column from an exisitng one
# We will add a new column called 'car_model' which has the value of car and model appended together with a space in between
from pyspark.sql.functions import concat
df = df.withColumn('Age_Sleep Quality', concat(col("Age"), lit(" "), col("Sleep Quality")))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+-------------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|first_column|second_column|third_column|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+------------+-------------+------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87        

### Renaming Columns

In [None]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+--------------+--------------+----------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|new_column_one|new_column_two|new_column_three|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+--------------+--------------+----------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86       

### Grouping By Columns

In [None]:
# Group By a column in PySpark
df.groupBy('Sleep Quality').count().show(5)

+-------------+-----+
|Sleep Quality|count|
+-------------+-----+
|            1|  480|
|            6|  489|
|            3|  490|
|            5|  521|
|            9|  480|
+-------------+-----+
only showing top 5 rows



In [None]:
# Group By multiple columns in PySpark
df.groupBy('Sleep Quality', 'Caffeine Intake (mg)').count().show(5)

+-------------+--------------------+-----+
|Sleep Quality|Caffeine Intake (mg)|count|
+-------------+--------------------+-----+
|            3|                  22|    1|
|            3|                  57|    8|
|            8|                 264|    2|
|            7|                  55|    2|
|            3|                  89|    2|
+-------------+--------------------+-----+
only showing top 5 rows



### Removing Columns

In [None]:
#Remove columns in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+--------------+----------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|new_column_two|new_column_three|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+--------------+----------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116          

In [None]:
#Remove multiple columnss in one go
df = df.drop('new_column_two') \
       .drop('new_column_three')
df.show(5,truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567    |8                 |3         |6           |32 3           

## DataFrame Operations on Rows

We will discuss the follwoing in this section:

1.   Filtering Rows
2. 	 Get Distinct Rows
3.   Sorting Rows
4.   Union Dataframes

### Filtering Rows

In [None]:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
exercise_filtered_count = df.filter(col('Exercise (mins/day)')>=30).count()
print("EXERCISE FILTERED RECORD COUNT: " + str(exercise_filtered_count))
df.filter(col('Exercise (mins/day)')>=30).show(truncate=False)

TOTAL RECORD COUNT: 5000
EXERCISE FILTERED RECORD COUNT: 3280
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567 

In [None]:
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
exercise_filtered_count = df.filter((col('Exercise (mins/day)')>=30) &
                                  (col('Sleep Quality')>=5)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(exercise_filtered_count))
df.filter(col('Exercise (mins/day)')>=30).show(truncate=False)

TOTAL RECORD COUNT: 5000
EUROPE FILTERED RECORD COUNT: 1988
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|2024-04-12|1860     |32 |Other |23.33           |4.61          |5.28             |3            |86                 |87                  |116                          |8.80892009394567   

### Get Distinct Rows

In [None]:
#Get Unique Rows in PySpark
df.select('Exercise (mins/day)').distinct().show()

+-------------------+
|Exercise (mins/day)|
+-------------------+
|                 31|
|                 85|
|                 65|
|                 53|
|                 78|
|                 34|
|                 81|
|                 28|
|                 76|
|                 26|
|                 27|
|                 44|
|                 12|
|                 22|
|                 47|
|                  1|
|                 52|
|                 13|
|                 86|
|                  6|
+-------------------+
only showing top 20 rows



In [None]:
#Get Unique Rows in PySpark based on mutliple columns
df.select('Exercise (mins/day)','Sleep Quality').distinct().show()

+-------------------+-------------+
|Exercise (mins/day)|Sleep Quality|
+-------------------+-------------+
|                 79|           10|
|                 71|            4|
|                 43|            7|
|                 86|           10|
|                 33|            8|
|                 59|            7|
|                 62|            1|
|                 41|            3|
|                 48|            6|
|                 10|            2|
|                 23|            6|
|                 69|            1|
|                 34|            5|
|                 16|            3|
|                 83|            6|
|                 17|            1|
|                 27|            9|
|                 17|            2|
|                 73|            4|
|                 13|           10|
+-------------------+-------------+
only showing top 20 rows



### Sorting Rows

In [None]:
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('Exercise (mins/day)').show(truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|2024-08-31|1711     |31 |Other |20.35           |3.61          |7.26             |6            |0                  |19                  |163                          |9.090458180962525   |2                 |3         |7           |31 6           

In [None]:
# To change the sorting order, you can use the ascending parameter
df.orderBy('Exercise (mins/day)', ascending=False).show(truncate=False)

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|Date      |Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|Age_Sleep Quality|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+-----------------+
|2024-09-03|1984     |38 |Female|23.25           |5.21          |5.96             |2            |89                 |103                 |126                          |11.04540322170719   |9                 |8         |6           |38 2           

In [None]:
# Using groupBy aand orderBy together
df.groupBy("Gender").count().orderBy('count', ascending=False).show(10)

+------+-----+
|Gender|count|
+------+-----+
|  Male| 1718|
|Female| 1675|
| Other| 1607|
+------+-----+



### Union Dataframes

In [None]:
# CASE 1: Union When columns are in order
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",", inferSchema=True)
male_caffeine = df.filter((col('Gender')=='Male') & (col('Caffeine Intake (mg)')>0))
female_caffeine = df.filter((col('Gender')=='Female') & (col('Caffeine Intake (mg)')>0))
print("MALE CAFFEINE: "+str(male_caffeine.count()))
print("FEMALE CAFFEINE: "+str(female_caffeine.count()))
print("MALE FEMALE CAFFEINE: "+str(male_caffeine.union(female_caffeine).count()))

MALE CAFFEINE: 1716
FEMALE CAFFEINE: 1668
MALE FEMALE CAFFEINE: 3384


In [None]:
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



## Common Data Manipulation Functions

In [None]:
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions))



### String Functions

In [None]:
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",", inferSchema=True)

In [None]:
from pyspark.sql.functions import col,lower, upper, substring
# Prints out the details of a function
help(substring)
# alias is used to rename the column name in the output
df.select(col('Gender'),lower(col('Gender')),upper(col('Gender')),substring(col('Gender'),1,4).alias("concatenated value")).show(5, False)

Help on function substring in module pyspark.sql.functions:

substring(str: 'ColumnOrName', pos: int, len: int) -> pyspark.sql.column.Column
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Parameters
    ----------
    str : :class:`~pyspark.sql.Column` or str
        target column to work on.
    pos : int
        starting position in str.
    len : int
        length of chars.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        substring of given value.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]

+------+--------

In [None]:
from pyspark.sql.functions import concat
df.select(col("Caffeine Intake (mg)"),col("Sleep Quality"),concat(col("Caffeine Intake (mg)"), lit(" "), col("Sleep Quality"))).show(5, False)

+--------------------+-------------+----------------------------------------------+
|Caffeine Intake (mg)|Sleep Quality|concat(Caffeine Intake (mg),  , Sleep Quality)|
+--------------------+-------------+----------------------------------------------+
|87                  |3            |87 3                                          |
|21                  |5            |21 5                                          |
|88                  |7            |88 7                                          |
|34                  |8            |34 8                                          |
|269                 |10           |269 10                                        |
+--------------------+-------------+----------------------------------------------+
only showing top 5 rows



### Numeric functions

Show the oldest date and the most recent date

In [None]:
from pyspark.sql.functions import min, max
df.select(min(col('Total Sleep Hours')), max(col('Total Sleep Hours'))).show()

+----------------------+----------------------+
|min(Total Sleep Hours)|max(Total Sleep Hours)|
+----------------------+----------------------+
|                   4.5|                   9.5|
+----------------------+----------------------+



Add 10 to the minimum and maximum weight

In [None]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('Total Sleep Hours'))+lit(10), max(col('Total Sleep Hours')+lit(10))).show()

+-----------------------------+-----------------------------+
|(min(Total Sleep Hours) + 10)|max((Total Sleep Hours + 10))|
+-----------------------------+-----------------------------+
|                         14.5|                         19.5|
+-----------------------------+-----------------------------+



### Operations on Date

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

+-------------------+
|                DOB|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+

root
 |-- DOB: string (nullable = true)



In [None]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy-MM-dd HH:mm:ss)|to_timestamp(DOB, yyyy-MM-dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy-MM-dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



In [None]:
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('DOB'),'dd/MMM/yyyy HH:mm:ss'))
df.show()
df.printSchema()

+----------------------------------+---------------------------------------+
|to_date(DOB, dd/MMM/yyyy HH:mm:ss)|to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss)|
+----------------------------------+---------------------------------------+
|                        2019-12-25|                    2019-12-25 13:30:00|
+----------------------------------+---------------------------------------+

root
 |-- to_date(DOB, dd/MMM/yyyy HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss): timestamp (nullable = true)



**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [None]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()

+----------------------+----------------------+
|date_add(max(Date), 3)|date_sub(min(Date), 3)|
+----------------------+----------------------+
|            2021-04-02|            1989-12-29|
+----------------------+----------------------+



## Joins in PySpark

In [None]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

+---+--------+
| id|car_name|
+---+--------+
|  1|   Car A|
|  2|   Car B|
|  3|   Car C|
+---+--------+

+---+---------+
| id|car_price|
+---+---------+
|  1|     1000|
|  2|     2000|
|  3|     3000|
+---+---------+



In [None]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

+---+--------+---------+
|id |car_name|car_price|
+---+--------+---------+
|1  |Car A   |1000     |
|2  |Car B   |2000     |
|3  |Car C   |3000     |
+---+--------+---------+



As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:
1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

## Spark SQL

In [None]:
# Load data
df = spark.read.csv('/content/sleep_cycle_productivity.csv', header=True, sep=",")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|      Date|Person_ID|Age|Gender|Sleep Start Time|Sleep End Time|Total Sleep Hours|Sleep Quality|Exercise (mins/day)|Caffeine Intake (mg)|Screen Time Before Bed (mins)|Work Hours (hrs/day)|Productivity Score|Mood Score|Stress Level|
+----------+---------+---+------+----------------+--------------+-----------------+-------------+-------------------+--------------------+-----------------------------+--------------------+------------------+----------+------------+
|2024-04-12|     1860| 32| Other|           23.33|          4.61|             5.28|            3|                 86|                  87|                          116|    8.80892009394567|                 8|         3|           6|
|2024-11-04|     1769| 41|Female|           21.02|          2.43|   

## RDD

> ด้วย `map` คุณสามารถกำหนดฟังก์ชันและใช้กับแต่ละเรคอร์ดใน RDD ได้  
> `flatMap` จะสร้าง RDD ใหม่โดยใช้ฟังก์ชันกับทุกองค์ประกอบใน RDD และทำให้ผลลัพธ์แบนราบ (flattening)  
> `filter` จะสร้าง RDD ใหม่ที่มีเพียงองค์ประกอบที่ตรงตามเงื่อนไขที่กำหนด  
> ส่วน `reduce` ใช้สำหรับรวมค่าขององค์ประกอบที่อยู่ติดกันเป็นผลลัพธ์เดียว  

ตัวอย่างเช่น หากคุณมีชุดตัวเลข คุณสามารถใช้ `reduce` เพื่อนำตัวเลขทั้งหมดมารวมกันโดยให้ฟังก์ชันรับค่าเข้ามาสองค่าและลดให้เหลือหนึ่งค่า

### เหตุผลที่ควรใช้ DataFrame แทน RDD:
1. **สามารถแทนข้อมูลในรูปแบบแถวและคอลัมน์** ซึ่งช่วยให้เข้าใจง่ายขึ้น แต่ก็หมายความว่ามันสามารถรองรับเฉพาะข้อมูลที่มีโครงสร้าง (structured) และกึ่งโครงสร้าง (semi-structured) เท่านั้น  
2. **รองรับการประมวลผลข้อมูลจากหลายรูปแบบ** เช่น AVRO, CSV, JSON และสามารถดึงข้อมูลจากระบบจัดเก็บข้อมูลต่าง ๆ เช่น HDFS, Hive Tables, และ MySQL  
3. **มีความสามารถในการปรับปรุงประสิทธิภาพงาน (Job Optimization) ที่ดีกว่า RDD**  
4. **DataFrame API ใช้งานง่ายมาก** และเหมาะสำหรับการวิเคราะห์ข้อมูล  

In [None]:
sleep = spark.sparkContext.textFile('/content/sleep_cycle_productivity.csv')
print(sleep.first())
sleep_header = sleep.first()
sleep_rest = sleep.filter(lambda line: line!=sleep_header)
print(sleep.first())

Date,Person_ID,Age,Gender,Sleep Start Time,Sleep End Time,Total Sleep Hours,Sleep Quality,Exercise (mins/day),Caffeine Intake (mg),Screen Time Before Bed (mins),Work Hours (hrs/day),Productivity Score,Mood Score,Stress Level
Date,Person_ID,Age,Gender,Sleep Start Time,Sleep End Time,Total Sleep Hours,Sleep Quality,Exercise (mins/day),Caffeine Intake (mg),Screen Time Before Bed (mins),Work Hours (hrs/day),Productivity Score,Mood Score,Stress Level


**How many cars are there in our csv data?**

In [None]:
sleep_rest.map(lambda line: line.split(",")).count()

5000

**Display the Date, Person_ID, Age, Gender and Total Sleep Hours of Female**

In [None]:
# Car name is column  0
(sleep_rest.filter(lambda line: line.split(",")[3]=='Female').
 map(lambda line: (line.split(",")[0],
    line.split(",")[1],
    line.split(",")[2],
    line.split(",")[3],
    line.split(",")[5])).collect())

[('2024-11-04', '1769', '41', 'Female', '2.43'),
 ('2024-01-28', '7278', '26', 'Female', '3.14'),
 ('2024-02-21', '6116', '49', 'Female', '3.89'),
 ('2024-02-14', '3693', '45', 'Female', '3.29'),
 ('2024-04-22', '2853', '29', 'Female', '6.08'),
 ('2024-10-24', '1262', '33', 'Female', '7.02'),
 ('2024-04-08', '3255', '20', 'Female', '7.24'),
 ('2024-12-14', '3961', '35', 'Female', '5.33'),
 ('2024-10-09', '1225', '38', 'Female', '4.29'),
 ('2024-08-10', '7709', '20', 'Female', '7.39'),
 ('2024-11-16', '7655', '33', 'Female', '6.02'),
 ('2024-02-27', '8509', '59', 'Female', '6.07'),
 ('2024-09-18', '1876', '49', 'Female', '6.25'),
 ('2024-02-16', '3141', '22', 'Female', '6.32'),
 ('2024-04-09', '3849', '48', 'Female', '4.32'),
 ('2024-06-08', '1851', '41', 'Female', '4.13'),
 ('2024-04-24', '9808', '24', 'Female', '4.46'),
 ('2024-07-10', '3786', '52', 'Female', '4.04'),
 ('2024-05-23', '2062', '59', 'Female', '5.46'),
 ('2024-09-09', '2542', '30', 'Female', '5.78'),
 ('2024-05-05', '526

**Display the Date, Person_ID, Age, Gender and Total Sleep Hours of Male and Female**

In [None]:
# Car name is column  0
(sleep_rest.filter(lambda line: line.split(",")[3] in ['Male', 'Female']).
 map(lambda line: (line.split(",")[0],
    line.split(",")[1],
    line.split(",")[2],
    line.split(",")[3],
    line.split(",")[5])).collect())

[('2024-11-04', '1769', '41', 'Female', '2.43'),
 ('2024-08-31', '2528', '20', 'Male', '3.45'),
 ('2024-07-08', '7439', '38', 'Male', '6.41'),
 ('2024-01-28', '7278', '26', 'Female', '3.14'),
 ('2024-02-21', '6116', '49', 'Female', '3.89'),
 ('2024-02-14', '3693', '45', 'Female', '3.29'),
 ('2024-04-22', '2853', '29', 'Female', '6.08'),
 ('2024-09-11', '5389', '41', 'Male', '7.22'),
 ('2024-10-24', '1262', '33', 'Female', '7.02'),
 ('2024-04-08', '3255', '20', 'Female', '7.24'),
 ('2024-12-14', '3961', '35', 'Female', '5.33'),
 ('2024-05-14', '2218', '34', 'Male', '6.23'),
 ('2024-10-09', '1225', '38', 'Female', '4.29'),
 ('2024-01-06', '2060', '41', 'Male', '6.08'),
 ('2024-08-10', '7709', '20', 'Female', '7.39'),
 ('2024-10-25', '8343', '56', 'Male', '4.5'),
 ('2024-11-16', '7655', '33', 'Female', '6.02'),
 ('2024-02-27', '8509', '59', 'Female', '6.07'),
 ('2024-09-18', '1876', '49', 'Female', '6.25'),
 ('2024-11-14', '6177', '43', 'Male', '2.77'),
 ('2024-02-16', '3141', '22', 'Fema

### Create a totally empty dataframe

In [None]:
from pyspark.sql.types import StructType
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()

++
||
++
++



### Create an empty dataframe with header

In [None]:
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()

+----+
|name|
+----+
+----+



### Create a dataframe with header and data

In [None]:
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":13},
  {"name":'Jacob',"age":24},
  {"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



In [None]:
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
        Row(name='Alice', age=13),
        Row(name='Jacob', age=24),
        Row(name='Betty', age=135)]).toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



## Drop Duplicates

In [None]:
from pyspark.sql import Row
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":5,"height":80},
  {"name":'Jacob',"age":24,"height":80},
  {"name":'Alice',"age":5,"height":80}
]
df = spark.createDataFrame(Row(**x) for x in mylist)
df.dropDuplicates().show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Jacob| 24|    80|
+-----+---+------+



In [None]:
df.dropDuplicates(subset=['height']).show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+

