In [1]:
import findspark 
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("ETL using Spark").getOrCreate()

In [3]:
data = [("student1",64,90),
        ("student2",59,100),
        ("student3",69,95),
        ("",70,110),
        ("student5",60,80),
        ("student3",69,95),
        ("student6",62,85),
        ("student7",65,80),
        ("student7",65,80)]

In [4]:
df = spark.createDataFrame(data, ['student', 'height_inches', 'weight_pounds'])
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



In [5]:
df.write.mode("overwrite").csv("student_hw.csv", header=True)

In [6]:
df1 = spark.read.csv("student_hw.csv", header=True, inferSchema=True)

In [8]:
df1 = df1.dropDuplicates()
df1.show()


+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
|    NULL|           70|          110|
+--------+-------------+-------------+



In [9]:
df1.count()

7

In [10]:
df1 = df1.dropna()

In [11]:
df1.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
+--------+-------------+-------------+



In [12]:
df1.write.mode("overwrite").parquet("student_hw.parquet")

In [13]:
# condesnse parquet to a single file 
df1 = df1.repartition(1) # reduce the number of partitions 

In [14]:
df1.write.mode("overwrite").parquet("student-hw-single.parquet")

In [15]:
df = spark.read.parquet("student-hw-single.parquet")
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
+--------+-------------+-------------+



In [16]:
from pyspark.sql.functions import expr

In [17]:
# convert inches to centimeters
df = df.withColumn("height_centimeters", expr("height_inches  * 2.54"))
df.show()

+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student7|           65|           80|            165.10|
|student2|           59|          100|            149.86|
|student1|           64|           90|            162.56|
|student3|           69|           95|            175.26|
|student5|           60|           80|            152.40|
|student6|           62|           85|            157.48|
+--------+-------------+-------------+------------------+



In [18]:
# convert punds to kilograms
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()

+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student7|           65|           80|            165.10|36.287360|
|student2|           59|          100|            149.86|45.359200|
|student1|           64|           90|            162.56|40.823280|
|student3|           69|           95|            175.26|43.091240|
|student5|           60|           80|            152.40|36.287360|
|student6|           62|           85|            157.48|38.555320|
+--------+-------------+-------------+------------------+---------+



In [19]:
df = df.drop("height_inches", 'height_pounds')
df.show()

+--------+-------------+------------------+---------+
| student|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+------------------+---------+
|student7|           80|            165.10|36.287360|
|student2|          100|            149.86|45.359200|
|student1|           90|            162.56|40.823280|
|student3|           95|            175.26|43.091240|
|student5|           80|            152.40|36.287360|
|student6|           85|            157.48|38.555320|
+--------+-------------+------------------+---------+



In [20]:
df = df.withColumnRenamed('height_centimeters', 'height_cm')
df.show()

+--------+-------------+---------+---------+
| student|weight_pounds|height_cm|weight_kg|
+--------+-------------+---------+---------+
|student7|           80|   165.10|36.287360|
|student2|          100|   149.86|45.359200|
|student1|           90|   162.56|40.823280|
|student3|           95|   175.26|43.091240|
|student5|           80|   152.40|36.287360|
|student6|           85|   157.48|38.555320|
+--------+-------------+---------+---------+



In [21]:
df.write.mode("overwrite").csv("student_transformed.csv", header = True)

In [22]:
# verify the csv file 
df = spark.read.csv("student_transformed.csv", header = True, inferSchema = True)
df.show()

+--------+-------------+---------+---------+
| student|weight_pounds|height_cm|weight_kg|
+--------+-------------+---------+---------+
|student7|           80|    165.1| 36.28736|
|student2|          100|   149.86|  45.3592|
|student1|           90|   162.56| 40.82328|
|student3|           95|   175.26| 43.09124|
|student5|           80|    152.4| 36.28736|
|student6|           85|   157.48| 38.55532|
+--------+-------------+---------+---------+



In [23]:
spark.stop()

# Exercise

In [24]:
spark1 = SparkSession.builder.appName("Exercise - ETL using Spark").getOrCreate()

In [25]:
df = spark1.read.csv("student_transformed.csv", header = True, inferSchema = True)
df.printSchema()

root
 |-- student: string (nullable = true)
 |-- weight_pounds: integer (nullable = true)
 |-- height_cm: double (nullable = true)
 |-- weight_kg: double (nullable = true)



In [26]:
df = df.withColumn("height_meters", expr("height_cm / 100"))
df.show()

+--------+-------------+---------+---------+------------------+
| student|weight_pounds|height_cm|weight_kg|     height_meters|
+--------+-------------+---------+---------+------------------+
|student7|           80|    165.1| 36.28736|             1.651|
|student2|          100|   149.86|  45.3592|1.4986000000000002|
|student1|           90|   162.56| 40.82328|            1.6256|
|student3|           95|   175.26| 43.09124|            1.7526|
|student5|           80|    152.4| 36.28736|             1.524|
|student6|           85|   157.48| 38.55532|            1.5748|
+--------+-------------+---------+---------+------------------+



In [27]:
df = df.withColumn("BMI", expr("weight_kg / (height_meters * height_meters)"))
df.show()

+--------+-------------+---------+---------+------------------+------------------+
| student|weight_pounds|height_cm|weight_kg|     height_meters|               BMI|
+--------+-------------+---------+---------+------------------+------------------+
|student7|           80|    165.1| 36.28736|             1.651|13.312549228648752|
|student2|          100|   149.86|  45.3592|1.4986000000000002|20.197328530250278|
|student1|           90|   162.56| 40.82328|            1.6256|15.448293591899683|
|student3|           95|   175.26| 43.09124|            1.7526|14.028892161964118|
|student5|           80|    152.4| 36.28736|             1.524|15.623755691955827|
|student6|           85|   157.48| 38.55532|            1.5748|15.546531093062187|
+--------+-------------+---------+---------+------------------+------------------+



In [28]:
df = df.drop("height_inches", 'weight_pounds')
df.show()

+--------+---------+---------+------------------+------------------+
| student|height_cm|weight_kg|     height_meters|               BMI|
+--------+---------+---------+------------------+------------------+
|student7|    165.1| 36.28736|             1.651|13.312549228648752|
|student2|   149.86|  45.3592|1.4986000000000002|20.197328530250278|
|student1|   162.56| 40.82328|            1.6256|15.448293591899683|
|student3|   175.26| 43.09124|            1.7526|14.028892161964118|
|student5|    152.4| 36.28736|             1.524|15.623755691955827|
|student6|   157.48| 38.55532|            1.5748|15.546531093062187|
+--------+---------+---------+------------------+------------------+



In [29]:
from pyspark.sql.functions import col, round 
df = df.withColumn("bmi_rounded", round(col('BMI')))
df.show()

+--------+---------+---------+------------------+------------------+-----------+
| student|height_cm|weight_kg|     height_meters|               BMI|bmi_rounded|
+--------+---------+---------+------------------+------------------+-----------+
|student7|    165.1| 36.28736|             1.651|13.312549228648752|       13.0|
|student2|   149.86|  45.3592|1.4986000000000002|20.197328530250278|       20.0|
|student1|   162.56| 40.82328|            1.6256|15.448293591899683|       15.0|
|student3|   175.26| 43.09124|            1.7526|14.028892161964118|       14.0|
|student5|    152.4| 36.28736|             1.524|15.623755691955827|       16.0|
|student6|   157.48| 38.55532|            1.5748|15.546531093062187|       16.0|
+--------+---------+---------+------------------+------------------+-----------+



In [30]:
df.write.mode("overwrite").parquet("student_exercise.parquet")

In [31]:
spark1.stop()