In [1]:
#Create sparksession
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()

In [2]:
sales_df = spark.read.csv("supermarket_sales - Sheet1.csv",header=True)

In [3]:
sales_df.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Ho

In [4]:
flights_df = spark.read.parquet("flights-1m.parquet")

In [5]:
flights_df.show()

+----------+---------+---------+--------+--------+---------+---------+
|   FL_DATE|DEP_DELAY|ARR_DELAY|AIR_TIME|DISTANCE| DEP_TIME| ARR_TIME|
+----------+---------+---------+--------+--------+---------+---------+
|2006-01-01|        5|       19|     350|    2475| 9.083333|12.483334|
|2006-01-02|      167|      216|     343|    2475|11.783334|15.766666|
|2006-01-03|       -7|       -2|     344|    2475| 8.883333|12.133333|
|2006-01-04|       -5|      -13|     331|    2475| 8.916667|    11.95|
|2006-01-05|       -3|      -17|     321|    2475|     8.95|11.883333|
|2006-01-06|       -4|      -32|     320|    2475| 8.933333|11.633333|
|2006-01-08|       -3|       -2|     346|    2475|     8.95|12.133333|
|2006-01-09|        3|        0|     334|    2475|     9.05|12.166667|
|2006-01-10|       -7|      -21|     334|    2475| 8.883333|11.816667|
|2006-01-11|        8|      -10|     321|    2475| 9.133333|     12.0|
|2006-01-12|       -5|      -27|     321|    2475| 8.916667|11.716666|
|2006-

In [6]:
student_df = spark.read.json("Students_Grading_Dataset.json",multiLine=True)

In [7]:
student_df.show()

+---+---------------+--------------+-----------+--------------------+--------------------------+-------------------+-----------+----------+------+-----+-----------------------+---------+-------------+----------------------+-------------------+--------------+-----------+---------------------+-------------------+----------+--------------------+-----------+
|Age|Assignments_Avg|Attendance (%)| Department|               Email|Extracurricular_Activities|Family_Income_Level|Final_Score|First_Name|Gender|Grade|Internet_Access_at_Home|Last_Name|Midterm_Score|Parent_Education_Level|Participation_Score|Projects_Score|Quizzes_Avg|Sleep_Hours_per_Night|Stress_Level (1-10)|Student_ID|Study_Hours_per_Week|Total_Score|
+---+---------------+--------------+-----------+--------------------+--------------------------+-------------------+-----------+----------+------+-----+-----------------------+---------+-------------+----------------------+-------------------+--------------+-----------+----------------

## Missing data Handling in PySpark

In [8]:
sales_df.count()

1000

In [9]:
sales_df.dropna(how="any")

DataFrame[Invoice ID: string, Branch: string, City: string, Customer type: string, Gender: string, Product line: string, Unit price: string, Quantity: string, Tax 5%: string, Total: string, Date: string, Time: string, Payment: string, cogs: string, gross margin percentage: string, gross income: string, Rating: string]

In [10]:
sales_df.count()

1000

In [11]:
flights_df.fillna(value=2)

DataFrame[FL_DATE: date, DEP_DELAY: smallint, ARR_DELAY: smallint, AIR_TIME: smallint, DISTANCE: smallint, DEP_TIME: float, ARR_TIME: float]

In [12]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy="median",inputCols = ["Attendance (%)"],outputCols = ["Attendance (%) out"])

In [13]:
model = imputer.fit(student_df)
student_df_imputed = model.transform(student_df)

In [14]:
student_df_imputed.show()

+---+---------------+--------------+-----------+--------------------+--------------------------+-------------------+-----------+----------+------+-----+-----------------------+---------+-------------+----------------------+-------------------+--------------+-----------+---------------------+-------------------+----------+--------------------+-----------+------------------+
|Age|Assignments_Avg|Attendance (%)| Department|               Email|Extracurricular_Activities|Family_Income_Level|Final_Score|First_Name|Gender|Grade|Internet_Access_at_Home|Last_Name|Midterm_Score|Parent_Education_Level|Participation_Score|Projects_Score|Quizzes_Avg|Sleep_Hours_per_Night|Stress_Level (1-10)|Student_ID|Study_Hours_per_Week|Total_Score|Attendance (%) out|
+---+---------------+--------------+-----------+--------------------+--------------------------+-------------------+-----------+----------+------+-----+-----------------------+---------+-------------+----------------------+-------------------+-----

## Data Cache and Performance improvement

In [15]:
sales_df.cache()

DataFrame[Invoice ID: string, Branch: string, City: string, Customer type: string, Gender: string, Product line: string, Unit price: string, Quantity: string, Tax 5%: string, Total: string, Date: string, Time: string, Payment: string, cogs: string, gross margin percentage: string, gross income: string, Rating: string]

In [17]:
import pandas as pd

In [19]:
p_df = pd.DataFrame({'id':[1,2,3], 'value':[10,20,30]})
df_spark = spark.createDataFrame(p_df)
pdf_new = df_spark.toPandas()

In [20]:
print(type(df_spark))
print(type(pdf_new))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>
