### Spark DataFrame Basic Operations

In [76]:
from pyspark.sql import SparkSession

In [77]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [78]:
df = spark.read.csv("appl_stock.csv", inferSchema=True, header=True)

In [79]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [80]:
df.show(4)

+----------+----------+----------+------------------+----------+---------+------------------+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|    210.58|119282800|          27.28265|
+----------+----------+----------+------------------+----------+---------+------------------+
only showing top 4 rows



In [81]:
df.head(3)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.date(2010, 1, 5), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.date(2010, 1, 6), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

In [82]:
df.head()

Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [83]:
df.filter("Close < 500").select(["Open","Close"]).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [84]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [85]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|
|2010-02-08|        195.690006|197.88000300000002|      

In [86]:
result = df.filter(df['Low'] ==  197.16).collect()

In [87]:
result

[Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [88]:
result[0]

Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [89]:
row =  result[0]

In [90]:
row.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [91]:
row.asDict()['Volume']

220441900

###  GroupBy and Aggregate

In [92]:
df = spark.read.csv("sales_info.csv", inferSchema=True,header=True)

In [93]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [94]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [95]:
df.groupBy('Company').count()\
.withColumnRenamed('count',"Occurrence").show()

+-------+----------+
|Company|Occurrence|
+-------+----------+
|   APPL|         4|
|   GOOG|         3|
|     FB|         2|
|   MSFT|         3|
+-------+----------+



In [96]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [97]:
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [98]:
df.groupBy('Company').max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [99]:
group_data = df.groupBy("Company")

In [100]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [101]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [102]:
df.select('Sales').count()

12

In [103]:
df.select(countDistinct('Sales')).withColumnRenamed('count(DISTINCT Sales)', 'Distinct').show()

+--------+
|Distinct|
+--------+
|      11|
+--------+



In [104]:
df.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [105]:
from pyspark.sql.functions import format_number

In [106]:
sales_std = df.select(stddev('Sales').alias('std'))

In [107]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [108]:
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [109]:
df.orderBy('Company').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
+-------+-------+-----+



In [110]:
df.orderBy(df['Company'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



### Missing Data

In [112]:
df = spark.read.csv("ContainsNull.csv", header=True, inferSchema=
                    True)

In [113]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [114]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [115]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [116]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [117]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [118]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [119]:
df.na.fill('Fill value for string').show()

+----+--------------------+-----+
|  Id|                Name|Sales|
+----+--------------------+-----+
|emp1|                John| NULL|
|emp2|Fill value for st...| NULL|
|emp3|Fill value for st...|345.0|
|emp4|               Cindy|456.0|
+----+--------------------+-----+



In [120]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [121]:
df.na.fill('No Name', subset=["Name"]).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| NULL|
|emp2|No Name| NULL|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [122]:
from pyspark.sql.functions import mean

In [123]:
mean_val = df.select(mean(df['Sales'])).collect()

In [124]:
mean_val

[Row(avg(Sales)=400.5)]

In [126]:
mean_val[0].asDict()

{'avg(Sales)': 400.5}

In [127]:
mean_val = mean_val[0].asDict()

In [129]:
mean_sales = mean_val['avg(Sales)']

In [130]:
df.na.fill(mean_sales, ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [131]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



###  Dates and Timestamps

In [132]:
df = spark.read.csv("appl_stock.csv", inferSchema=True, header=True)

In [133]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [134]:
df.show(1)

+----------+----------+----------+------------------+----------+---------+---------+
|      Date|      Open|      High|               Low|     Close|   Volume|Adj Close|
+----------+----------+----------+------------------+----------+---------+---------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|27.727039|
+----------+----------+----------+------------------+----------+---------+---------+
only showing top 1 row



In [135]:
df.head()

Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [136]:
df.select(['Date','Open']).show()

+----------+------------------+
|      Date|              Open|
+----------+------------------+
|2010-01-04|        213.429998|
|2010-01-05|        214.599998|
|2010-01-06|        214.379993|
|2010-01-07|            211.75|
|2010-01-08|        210.299994|
|2010-01-11|212.79999700000002|
|2010-01-12|209.18999499999998|
|2010-01-13|        207.870005|
|2010-01-14|210.11000299999998|
|2010-01-15|210.92999500000002|
|2010-01-19|        208.330002|
|2010-01-20|        214.910006|
|2010-01-21|        212.079994|
|2010-01-22|206.78000600000001|
|2010-01-25|202.51000200000001|
|2010-01-26|205.95000100000001|
|2010-01-27|        206.849995|
|2010-01-28|        204.930004|
|2010-01-29|        201.079996|
|2010-02-01|192.36999699999998|
+----------+------------------+
only showing top 20 rows



In [137]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear,
                                   month, year, weekofyear,
                                   format_number, date_format)

In [138]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [139]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [141]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [142]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [143]:
df = df.withColumn('Year',year(df['Date']))

In [145]:
df = df.withColumn('Month',month(df['Date']))

In [146]:
df.show(3)

+----------+----------+----------+------------------+----------+---------+------------------+----+-----+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|Year|Month|
+----------+----------+----------+------------------+----------+---------+------------------+----+-----+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|2010|    1|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|2010|    1|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|2010|    1|
+----------+----------+----------+------------------+----------+---------+------------------+----+-----+
only showing top 3 rows



In [148]:
df.groupBy("Year").mean().show()

+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+------------------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|         avg(Volume)|    avg(Adj Close)|avg(Year)|        avg(Month)|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+------------------+
|2015|120.17575393253965|121.24452385714291| 118.8630954325397|120.03999980555547|  5.18378869047619E7|115.96740080555561|   2015.0| 6.567460317460317|
|2013| 473.1281355634922| 477.6389272301587|468.24710264682557| 472.6348802857143|          1.016087E8| 62.61798788492063|   2013.0|6.5436507936507935|
|2014| 295.1426195357143|297.56103184523823| 292.9949599801587| 295.4023416507935| 6.315273055555555E7| 87.63583323809523|   2014.0| 6.551587301587301|
|2012|     576.652720788| 581.8254008040001| 569.9211606079999| 576.0497195640002|      

In [150]:
average_close_by_year = df.groupBy("Year").mean().select(['Year',"avg(Close)"])

In [151]:
average_close_by_year.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [162]:
new_result = average_close_by_year.orderBy(df["Year"].desc()).withColumnRenamed("avg(Close)", "Average Closing Price")

In [164]:
new_result.select(['Year',format_number('Average Closing Price', 3).alias('Average Closing Price')]).show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2016|              104.604|
|2015|              120.040|
|2014|              295.402|
|2013|              472.635|
|2012|              576.050|
|2011|              364.004|
|2010|              259.842|
+----+---------------------+

