In [33]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import warnings
warnings.filterwarnings('ignore')

In [18]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('ETL Using Spark') \
    .config("spark.sql.files.ignoreCorruptFiles", "true") \
    .getOrCreate()

In [19]:
data = [
    ("student1",64,90),
    ("student2",59,100),
    ("student3",69,95),
    ("",70,110),
    ("student5",60,80),
    ("student3",69,95),
    ("student6",62,85),
    ("student7",65,80),
    ("student7",65,80)
]

In [21]:
df = spark.createDataFrame(data, ["student", "height_inches","weight_pounds"])

In [22]:
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



* write to csv 

In [23]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

# Write data to CSV file in local file system using Pandas
pandas_df.to_csv("student-hw.csv", index=False)

In [25]:
# Load student dataset
df = spark.read.csv("file:///D:/PROGRAMMING/FULL STACK DATA SCIENCE/JupLab/Scrapping/student-hw.csv", header=True, inferSchema=True)

# Display dataframe
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|    NULL|           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



In [30]:
# Convert DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

# Write Pandas DataFrame to Parquet file
df_pandas.to_parquet("D:/PROGRAMMING/FULL STACK DATA SCIENCE/JupLab/Scrapping/student-hw.parquet", engine="pyarrow", index=False)


In [31]:
df = spark.read.parquet("file:///D:/PROGRAMMING/FULL STACK DATA SCIENCE/JupLab/Scrapping/student-hw.parquet")

In [32]:
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|    NULL|           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



# Transform 

In [35]:
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()

+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student1|           64|           90|            162.56|
|student2|           59|          100|            149.86|
|student3|           69|           95|            175.26|
|    NULL|           70|          110|            177.80|
|student5|           60|           80|            152.40|
|student3|           69|           95|            175.26|
|student6|           62|           85|            157.48|
|student7|           65|           80|            165.10|
|student7|           65|           80|            165.10|
+--------+-------------+-------------+------------------+



In [36]:
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()

+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student1|           64|           90|            162.56|40.823280|
|student2|           59|          100|            149.86|45.359200|
|student3|           69|           95|            175.26|43.091240|
|    NULL|           70|          110|            177.80|49.895120|
|student5|           60|           80|            152.40|36.287360|
|student3|           69|           95|            175.26|43.091240|
|student6|           62|           85|            157.48|38.555320|
|student7|           65|           80|            165.10|36.287360|
|student7|           65|           80|            165.10|36.287360|
+--------+-------------+-------------+------------------+---------+



In [37]:
df = df.drop("height_inches","weight_pounds")
df.show()

+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student1|            162.56|40.823280|
|student2|            149.86|45.359200|
|student3|            175.26|43.091240|
|    NULL|            177.80|49.895120|
|student5|            152.40|36.287360|
|student3|            175.26|43.091240|
|student6|            157.48|38.555320|
|student7|            165.10|36.287360|
|student7|            165.10|36.287360|
+--------+------------------+---------+



In [38]:
df = df.withColumnRenamed("height_centimeters","height_cm")
df.show()

+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student1|   162.56|40.823280|
|student2|   149.86|45.359200|
|student3|   175.26|43.091240|
|    NULL|   177.80|49.895120|
|student5|   152.40|36.287360|
|student3|   175.26|43.091240|
|student6|   157.48|38.555320|
|student7|   165.10|36.287360|
|student7|   165.10|36.287360|
+--------+---------+---------+



In [39]:
spark.stop()