# Setup Spark Session

In [1]:
import findspark
findspark.init('C:/users/ychen/spark')

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ops').getOrCreate()

# Basic Operations

In [2]:
df = spark.read.csv('../datasets/rpt_hot_skills.csv', inferSchema=True, header=True)

In [3]:
df.take(2)

[Row(minimum=725.0062847, gradient=7.32e-07, curve=2, master_Term_id=2, best_fit_flag='N', negative_curve_flag='N'),
 Row(minimum=725.0064017, gradient=7.34e-07, curve=1, master_Term_id=2, best_fit_flag='N', negative_curve_flag='N')]

In [4]:
df.printSchema()

root
 |-- minimum: double (nullable = true)
 |-- gradient: double (nullable = true)
 |-- curve: integer (nullable = true)
 |-- master_Term_id: integer (nullable = true)
 |-- best_fit_flag: string (nullable = true)
 |-- negative_curve_flag: string (nullable = true)



In [5]:
df.show(3)

+-----------+--------+-----+--------------+-------------+-------------------+
|    minimum|gradient|curve|master_Term_id|best_fit_flag|negative_curve_flag|
+-----------+--------+-----+--------------+-------------+-------------------+
|725.0062847| 7.32E-7|    2|             2|            N|                  N|
|725.0064017| 7.34E-7|    1|             2|            N|                  N|
|725.0272307| 7.47E-7|    3|             2|            N|                  N|
+-----------+--------+-----+--------------+-------------+-------------------+
only showing top 3 rows



# Filtering

In [7]:
df.filter('minimum < 800').select(['gradient', 'curve']).show()

+---------+-----+
| gradient|curve|
+---------+-----+
|  7.32E-7|    2|
|  7.34E-7|    1|
|  7.47E-7|    3|
|      0.0|    4|
|  5.34E-7|    2|
|  5.34E-7|    1|
|   5.4E-7|    3|
|  5.05E-7|    2|
|  5.04E-7|    1|
|  1.22E-5|    3|
|  -3.9E-6|    2|
|   7.5E-7|    1|
|-5.36E-12|    3|
|-1.85E-13|    4|
| -3.78E-8|    2|
|  4.26E-8|    1|
| -3.69E-8|    3|
| -9.22E-9|    4|
|  2.97E-6|    3|
|   2.8E-6|    4|
+---------+-----+
only showing top 20 rows



In [8]:
df.filter(df['minimum'] < 800).select(['gradient', 'curve']).show()

+---------+-----+
| gradient|curve|
+---------+-----+
|  7.32E-7|    2|
|  7.34E-7|    1|
|  7.47E-7|    3|
|      0.0|    4|
|  5.34E-7|    2|
|  5.34E-7|    1|
|   5.4E-7|    3|
|  5.05E-7|    2|
|  5.04E-7|    1|
|  1.22E-5|    3|
|  -3.9E-6|    2|
|   7.5E-7|    1|
|-5.36E-12|    3|
|-1.85E-13|    4|
| -3.78E-8|    2|
|  4.26E-8|    1|
| -3.69E-8|    3|
| -9.22E-9|    4|
|  2.97E-6|    3|
|   2.8E-6|    4|
+---------+-----+
only showing top 20 rows



In [9]:
df.filter((df['minimum'] > 200) & ~(df['minimum'] < 200)).show()

+-----------+---------+-----+--------------+-------------+-------------------+
|    minimum| gradient|curve|master_Term_id|best_fit_flag|negative_curve_flag|
+-----------+---------+-----+--------------+-------------+-------------------+
|725.0062847|  7.32E-7|    2|             2|            N|                  N|
|725.0064017|  7.34E-7|    1|             2|            N|                  N|
|725.0272307|  7.47E-7|    3|             2|            N|                  N|
|670.5105497|      0.0|    4|             2|            Y|                  N|
|1103.599293|  -6.5E-6|    2|             4|            N|                  Y|
|1832.713573|  1.85E-7|    1|             4|            N|                  N|
|1010.477347|  -1.9E-6|    3|             4|            Y|                  Y|
|1098.184533| -6.26E-6|    4|             4|            N|                  Y|
|2089.663188| -7.71E-6|    2|             5|            N|                  Y|
|2345.643129|  8.37E-7|    1|             5|        

In [10]:
df.filter(df['best_fit_flag'] == 'Y').show()

+-----------+----------+-----+--------------+-------------+-------------------+
|    minimum|  gradient|curve|master_Term_id|best_fit_flag|negative_curve_flag|
+-----------+----------+-----+--------------+-------------+-------------------+
|670.5105497|       0.0|    4|             2|            Y|                  N|
|1010.477347|   -1.9E-6|    3|             4|            Y|                  Y|
|1767.193057| -6.78E-61|    4|             5|            Y|                  Y|
|1650.630413| -1.13E-23|    4|             7|            Y|                  Y|
|2040.097512| -8.57E-10|    4|             8|            Y|                  Y|
|1914.218778|7.05359E-4|    4|           153|            Y|                  N|
|4032.903568|       0.0|    4|            14|            Y|                  N|
|1579.599391|  -1.72E-5|    4|            20|            Y|                  Y|
|1842.351388|       0.0|    4|            22|            Y|                  N|
|2979.215589|       0.0|    4|          

In [13]:
result = df.filter(df['best_fit_flag'] == 'Y').collect()

In [15]:
row = result[0]
row.asDict()['best_fit_flag']

'Y'

# Groupby and Common Aggregation

In [16]:
df.groupBy('best_fit_flag')

<pyspark.sql.group.GroupedData at 0x1c0a7fc97c8>

In [17]:
df.groupBy('best_fit_flag').mean().show()

+-------------+------------------+--------------------+-----------------+-------------------+
|best_fit_flag|      avg(minimum)|       avg(gradient)|       avg(curve)|avg(master_Term_id)|
+-------------+------------------+--------------------+-----------------+-------------------+
|            N|2455.7339148680194|4.213025850490093...|2.110062893081761| 128.75471698113208|
|            Y|1976.4787205345285|1.844141975358216...|3.669811320754717| 128.75471698113208|
+-------------+------------------+--------------------+-----------------+-------------------+



In [18]:
df.groupBy('best_fit_flag').sum().show()

+-------------+------------------+--------------------+----------+-------------------+
|best_fit_flag|      sum(minimum)|       sum(gradient)|sum(curve)|sum(master_Term_id)|
+-------------+------------------+--------------------+----------+-------------------+
|            N| 780923.3849280302|0.013397422204558498|       671|              40944|
|            Y|209506.74437666003|0.001954790493879...|       389|              13648|
+-------------+------------------+--------------------+----------+-------------------+



In [19]:
df.groupBy('best_fit_flag').count().show()

+-------------+-----+
|best_fit_flag|count|
+-------------+-----+
|            N|  318|
|            Y|  106|
+-------------+-----+



In [20]:
df.agg({'gradient': 'min'}).show()

+-------------+
|min(gradient)|
+-------------+
|  -2.06384E-4|
+-------------+



In [21]:
df.agg({'curve': 'sum'}).show()

+----------+
|sum(curve)|
+----------+
|      1060|
+----------+



In [22]:
group_data = df.groupBy('best_fit_flag')

In [23]:
group_data.agg({'gradient': 'sum'}).show()

+-------------+--------------------+
|best_fit_flag|       sum(gradient)|
+-------------+--------------------+
|            N|0.013397422204558498|
|            Y|0.001954790493879...|
+-------------+--------------------+



# More Operation Functions

In [24]:
from pyspark.sql.functions import countDistinct, avg, stddev
df.select(countDistinct('curve')).show()

+---------------------+
|count(DISTINCT curve)|
+---------------------+
|                    4|
+---------------------+



In [25]:
df.select(avg('gradient').alias('Average Gradient')).show()

+--------------------+
|    Average Gradient|
+--------------------+
|3.620804881707126E-5|
+--------------------+



In [26]:
df.select(stddev('gradient')).show()

+---------------------+
|stddev_samp(gradient)|
+---------------------+
| 3.654757722030033E-4|
+---------------------+



In [27]:
from pyspark.sql.functions import format_number
grad_std = df.select(stddev('gradient').alias('std'))

In [28]:
grad_std.show()

+--------------------+
|                 std|
+--------------------+
|3.654757722030033E-4|
+--------------------+



In [29]:
grad_std.select(format_number('std', 2).alias('std')).show()

+----+
| std|
+----+
|0.00|
+----+



# Order Rows

In [30]:
df.orderBy('gradient').show()

+-----------+-----------+-----+--------------+-------------+-------------------+
|    minimum|   gradient|curve|master_Term_id|best_fit_flag|negative_curve_flag|
+-----------+-----------+-----+--------------+-------------+-------------------+
|8807.157199|-2.06384E-4|    2|           138|            N|                  Y|
|10124.73732|-1.57967E-4|    2|           238|            N|                  Y|
|10128.97156|-1.49556E-4|    3|           238|            N|                  Y|
|8729.035826|-1.43675E-4|    3|           138|            N|                  Y|
|2263.437028|    -6.8E-5|    4|           156|            Y|                  Y|
|6911.098422|   -5.65E-5|    2|           126|            N|                  Y|
|3693.307337|   -5.31E-5|    2|            28|            N|                  Y|
| 6914.47351|    -5.1E-5|    3|           126|            N|                  Y|
|1319.305954|   -3.63E-5|    2|            31|            N|                  Y|
|1319.499305|   -3.63E-5|   

In [32]:
df.orderBy(df['gradient'].desc()).show()

+-----------+-----------+-----+--------------+-------------+-------------------+
|    minimum|   gradient|curve|master_Term_id|best_fit_flag|negative_curve_flag|
+-----------+-----------+-----+--------------+-------------+-------------------+
|9053.930323|0.006378907|    3|            66|            N|                  N|
|6263.383618|0.003852375|    3|           142|            N|                  N|
|1914.218778| 7.05359E-4|    4|           153|            Y|                  N|
|  2083.4036| 4.26418E-4|    3|            35|            N|                  N|
|4273.196412| 3.63939E-4|    3|           118|            N|                  N|
|1850.704919| 2.47873E-4|    3|           148|            N|                  N|
|2590.531473| 2.42125E-4|    3|           251|            Y|                  N|
|2620.305292|  2.0841E-4|    3|           211|            N|                  N|
|1877.616765| 1.91045E-4|    3|           121|            N|                  N|
|2155.647897|  1.8596E-4|   

# Missing Value

In [39]:
df = spark.read.csv('../datasets/test.csv', inferSchema = True, header=True)
df.show()

+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal| FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| 30|1/11/2008|      1|   12|                null| 67029|  67429|        0.0|         0|       0|         0|     0|       1|
| 31|1/14/2008|      1|   12|                null| 67429|  67848|        0.0|         0|       0|         0|     0|       1|
| 32|1/14/2008|      1|   12|                null| 67848|  67850|        0.0|         0|       0|         0|     0|       1|
| 33|1/17/2008|      1|   12|                null| 67850|  68072|        0.0|         0|       0|         0|     0|       1|
| 34|1/19/2008|      1|   12|                null| 68072|  68186|        0.0|         0|       0|         0|     0|       1|


In [40]:
df.na.drop().show()

+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal|FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| 37|1/23/2008|      1|   12|Kakuru - Within Town| 68418|  68501|       0.0|         0|       0|         0|     0|       1|
| 38|1/24/2008|      1|   12|Nakuru - Mogotio ...| 68501|  68737|       0.0|         0|       0|         0|     0|       1|
| 39|1/26/2008|      1|   12|Nakuru - IDP Camp...| 68737|  69389|       0.0|         0|       0|         0|     0|       1|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+



In [41]:
df.na.drop(thresh=13).show()

+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal|FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| 37|1/23/2008|      1|   12|Kakuru - Within Town| 68418|  68501|       0.0|         0|       0|         0|     0|       1|
| 38|1/24/2008|      1|   12|Nakuru - Mogotio ...| 68501|  68737|       0.0|         0|       0|         0|     0|       1|
| 39|1/26/2008|      1|   12|Nakuru - IDP Camp...| 68737|  69389|       0.0|         0|       0|         0|     0|       1|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+



In [42]:
df.na.drop(how = 'all').show()

+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal| FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| 30|1/11/2008|      1|   12|                null| 67029|  67429|        0.0|         0|       0|         0|     0|       1|
| 31|1/14/2008|      1|   12|                null| 67429|  67848|        0.0|         0|       0|         0|     0|       1|
| 32|1/14/2008|      1|   12|                null| 67848|  67850|        0.0|         0|       0|         0|     0|       1|
| 33|1/17/2008|      1|   12|                null| 67850|  68072|        0.0|         0|       0|         0|     0|       1|
| 34|1/19/2008|      1|   12|                null| 68072|  68186|        0.0|         0|       0|         0|     0|       1|


In [43]:
df.na.drop(subset="Destination").show()

+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal|FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+
| 37|1/23/2008|      1|   12|Kakuru - Within Town| 68418|  68501|       0.0|         0|       0|         0|     0|       1|
| 38|1/24/2008|      1|   12|Nakuru - Mogotio ...| 68501|  68737|       0.0|         0|       0|         0|     0|       1|
| 39|1/26/2008|      1|   12|Nakuru - IDP Camp...| 68737|  69389|       0.0|         0|       0|         0|     0|       1|
+---+---------+-------+-----+--------------------+------+-------+----------+----------+--------+----------+------+--------+



In [44]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Country: integer (nullable = true)
 |-- VehID: integer (nullable = true)
 |-- Destination: string (nullable = true)
 |-- KmInit: integer (nullable = true)
 |-- KmFinal: integer (nullable = true)
 |-- FuelBought: double (nullable = true)
 |-- AmountFuel: integer (nullable = true)
 |-- CashFuel: integer (nullable = true)
 |-- AmountCash: integer (nullable = true)
 |-- FuelKm: integer (nullable = true)
 |-- DriverID: integer (nullable = true)



In [46]:
df.na.fill("Unknown").show()

+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal| FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| 30|1/11/2008|      1|   12|             Unknown| 67029|  67429|        0.0|         0|       0|         0|     0|       1|
| 31|1/14/2008|      1|   12|             Unknown| 67429|  67848|        0.0|         0|       0|         0|     0|       1|
| 32|1/14/2008|      1|   12|             Unknown| 67848|  67850|        0.0|         0|       0|         0|     0|       1|
| 33|1/17/2008|      1|   12|             Unknown| 67850|  68072|        0.0|         0|       0|         0|     0|       1|
| 34|1/19/2008|      1|   12|             Unknown| 68072|  68186|        0.0|         0|       0|         0|     0|       1|


In [47]:
df.na.fill('fill value', subset = 'Destination').show()

+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal| FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| 30|1/11/2008|      1|   12|          fill value| 67029|  67429|        0.0|         0|       0|         0|     0|       1|
| 31|1/14/2008|      1|   12|          fill value| 67429|  67848|        0.0|         0|       0|         0|     0|       1|
| 32|1/14/2008|      1|   12|          fill value| 67848|  67850|        0.0|         0|       0|         0|     0|       1|
| 33|1/17/2008|      1|   12|          fill value| 67850|  68072|        0.0|         0|       0|         0|     0|       1|
| 34|1/19/2008|      1|   12|          fill value| 68072|  68186|        0.0|         0|       0|         0|     0|       1|


# Dates and Timestamps

In [65]:
df = spark.read.csv('../datasets/test.csv', inferSchema = True, header=True)

In [66]:
df.take(1)

[Row(ID=30, Date='1/11/2008', Country=1, VehID=12, Destination=None, KmInit=67029, KmFinal=67429, FuelBought=0.0, AmountFuel=0, CashFuel=0, AmountCash=0, FuelKm=0, DriverID=1)]

In [67]:
df.select('Date', 'KmFinal').show()

+---------+-------+
|     Date|KmFinal|
+---------+-------+
|1/11/2008|  67429|
|1/14/2008|  67848|
|1/14/2008|  67850|
|1/17/2008|  68072|
|1/19/2008|  68186|
|1/22/2008|  68418|
|1/23/2008|  68501|
|1/24/2008|  68737|
|1/26/2008|  69389|
| 1/6/2008|  49219|
+---------+-------+



In [68]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format, to_timestamp)

In [69]:
df = df.select(to_timestamp(df.Date, 'M/d/yyyy').alias('dt'), 'KmFinal')

In [70]:
df.select(dayofmonth(df['dt'])).show(5)

+--------------+
|dayofmonth(dt)|
+--------------+
|            11|
|            14|
|            14|
|            17|
|            19|
+--------------+
only showing top 5 rows



In [72]:
df.select(hour(df['dt'])).show(5)

+--------+
|hour(dt)|
+--------+
|       0|
|       0|
|       0|
|       0|
|       0|
+--------+
only showing top 5 rows



In [73]:
df.select(month(df['dt'])).show(5)

+---------+
|month(dt)|
+---------+
|        1|
|        1|
|        1|
|        1|
|        1|
+---------+
only showing top 5 rows



In [74]:
df.select(year(df['dt'])).show(5)

+--------+
|year(dt)|
+--------+
|    2008|
|    2008|
|    2008|
|    2008|
|    2008|
+--------+
only showing top 5 rows



In [75]:
new_df = df.withColumn('Year', year(df['dt']))

In [76]:
result = new_df.groupBy('Year').mean().select(['Year', 'avg(KmFinal)'])
result.show()

+----+------------+
|Year|avg(KmFinal)|
+----+------------+
|2008|     66364.9|
+----+------------+



In [77]:
new_result = result.withColumnRenamed('avg(KmFinal)', 'Average KM Final')
new_result.show()

+----+----------------+
|Year|Average KM Final|
+----+----------------+
|2008|         66364.9|
+----+----------------+



In [79]:
new_result.select(['Year', format_number('Average KM Final', 2).alias('Average KM Final')]).show()

+----+----------------+
|Year|Average KM Final|
+----+----------------+
|2008|       66,364.90|
+----+----------------+



# Use SQL

In [83]:
df = spark.read.csv('../datasets/test.csv', inferSchema = True, header=True)
df.createOrReplaceTempView("test")

In [87]:
df1 = spark.sql("select * from test")
df1.show()

+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| ID|     Date|Country|VehID|         Destination|KmInit|KmFinal| FuelBought|AmountFuel|CashFuel|AmountCash|FuelKm|DriverID|
+---+---------+-------+-----+--------------------+------+-------+-----------+----------+--------+----------+------+--------+
| 30|1/11/2008|      1|   12|                null| 67029|  67429|        0.0|         0|       0|         0|     0|       1|
| 31|1/14/2008|      1|   12|                null| 67429|  67848|        0.0|         0|       0|         0|     0|       1|
| 32|1/14/2008|      1|   12|                null| 67848|  67850|        0.0|         0|       0|         0|     0|       1|
| 33|1/17/2008|      1|   12|                null| 67850|  68072|        0.0|         0|       0|         0|     0|       1|
| 34|1/19/2008|      1|   12|                null| 68072|  68186|        0.0|         0|       0|         0|     0|       1|


In [88]:
df1 = spark.sql("select VehID, count(*) as count from test group by VehID")
df1.show()

+-----+-----+
|VehID|count|
+-----+-----+
|   12|    9|
|   35|    1|
+-----+-----+



# Convert to Pandas DataFrame

In [89]:
df = spark.read.csv('../datasets/test.csv', inferSchema = True, header=True)
df.toPandas().head()

Unnamed: 0,ID,Date,Country,VehID,Destination,KmInit,KmFinal,FuelBought,AmountFuel,CashFuel,AmountCash,FuelKm,DriverID
0,30,1/11/2008,1.0,12,,67029,67429,0.0,0,0,0,0,1
1,31,1/14/2008,1.0,12,,67429,67848,0.0,0,0,0,0,1
2,32,1/14/2008,1.0,12,,67848,67850,0.0,0,0,0,0,1
3,33,1/17/2008,1.0,12,,67850,68072,0.0,0,0,0,0,1
4,34,1/19/2008,1.0,12,,68072,68186,0.0,0,0,0,0,1


In [94]:
df_pd = df.toPandas()
df_pd[['Date','KmFinal']].head()

Unnamed: 0,Date,KmFinal
0,1/11/2008,67429
1,1/14/2008,67848
2,1/14/2008,67850
3,1/17/2008,68072
4,1/19/2008,68186


__Please note: the dataset will be collected at the local memory mode from distributed memory mode after toPandas() function called.__