In [4]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

In [5]:
from pyspark.sql import SparkSession
# Create SparkSession
# Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("ETL using Spark").getOrCreate()

In [6]:
# create a list of tuples, each tuple contains the student id, height and weight. Some rows are intentionally duplicated
data = [("student1",64,90),
        ("student2",59,100),
        ("student3",69,95),
        ("",70,110),
        ("student5",60,80),
        ("student3",69,95),
        ("student6",62,85),
        ("student7",65,80),
        ("student7",65,80)]

In [7]:
# create a dataframe using createDataFrame and pass the data and the column names.
df = spark.createDataFrame(data, ["student","height_inches","weight_pounds"])

In [8]:
# show the data frame
df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



In [6]:
df.write.mode("overwrite").csv("student-hw.csv", header=True)

                                                                                

In [9]:
# Verify the csv file
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)
df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|    NULL|           70|          110|
+--------+-------------+-------------+



## Task 2 - Read from a csv file and write to parquet file


In [10]:
# print the number of rows in the dataframe
df.count()

9

In [16]:
df = df.dropDuplicates()
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
|    NULL|           70|          110|
+--------+-------------+-------------+



In [15]:
df.count()

7

In [17]:
df=df.dropna()
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
+--------+-------------+-------------+



In [14]:
# Save to parquet file
df.write.mode("overwrite").parquet("student-hw.parquet")

                                                                                

In [18]:
!!ls -l student-hw.parquet

['3477 Jps',
 '2999 SparkSubmit',
 '1016 Worker',
 '955 Master',
 '3374 SparkSubmit']

In [19]:
# Repartition data to a single partition
df = df.repartition(1)

In [20]:
# Save again as parquet as a single partition
df.write.mode("overwrite").parquet("student-hw-single.parquet")

In [None]:
!ls -l student-hw-single.parquet

In [18]:
# Read from Parquet and save as CSV
df = spark.read.parquet("student-hw-single.parquet")

In [22]:
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student6|           62|           85|
+--------+-------------+-------------+



Transform the data


In [23]:
# import the expr function that helps in transforming the data
from pyspark.sql.functions import expr

In [24]:
# Convert inches to centimeters, multiplying with 2.54 to get a new column height_centimeters
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()

+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student7|           65|           80|            165.10|
|student2|           59|          100|            149.86|
|student1|           64|           90|            162.56|
|student3|           69|           95|            175.26|
|student5|           60|           80|            152.40|
|student6|           62|           85|            157.48|
+--------+-------------+-------------+------------------+



Convert pounds to kilograms


In [25]:
# Convert pounds to kilograms
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()

+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student7|           65|           80|            165.10|36.287360|
|student2|           59|          100|            149.86|45.359200|
|student1|           64|           90|            162.56|40.823280|
|student3|           69|           95|            175.26|43.091240|
|student5|           60|           80|            152.40|36.287360|
|student6|           62|           85|            157.48|38.555320|
+--------+-------------+-------------+------------------+---------+



In [26]:
# drop the columns "height_inches","weight_pounds"
df = df.drop("height_inches","weight_pounds")
df.show()

+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student7|            165.10|36.287360|
|student2|            149.86|45.359200|
|student1|            162.56|40.823280|
|student3|            175.26|43.091240|
|student5|            152.40|36.287360|
|student6|            157.48|38.555320|
+--------+------------------+---------+



In [27]:
# rename the lengthy column name "height_centimeters" to "height_cm"
df = df.withColumnRenamed("height_centimeters","height_cm")
df.show()

+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student7|   165.10|36.287360|
|student2|   149.86|45.359200|
|student1|   162.56|40.823280|
|student3|   175.26|43.091240|
|student5|   152.40|36.287360|
|student6|   157.48|38.555320|
+--------+---------+---------+



In [28]:
df.write.mode("overwrite").csv("student_transformed.csv", header=True)

In [29]:
# Verify the saved csv file
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
df.show()

+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student7|    165.1| 36.28736|
|student2|   149.86|  45.3592|
|student1|   162.56| 40.82328|
|student3|   175.26| 43.09124|
|student5|    152.4| 36.28736|
|student6|   157.48| 38.55532|
+--------+---------+---------+



In [19]:
spark.stop()