In [11]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark\\spark-2.4.5-bin-hadoop2.7'

## Read in DataFrame

In [12]:
from pyspark.sql import SparkSession

In [13]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [14]:
df = spark.read.json("people.json")

In [15]:
df.show()

+---+-------+
|age|   name|
+---+-------+
| 23|Gilbert|
| 19|  Alexa|
| 27|    May|
| 31|Deloise|
+---+-------+



In [16]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [17]:
df.columns

['age', 'name']

In [18]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [19]:
df.describe().show()

+-------+-----------------+-----+
|summary|              age| name|
+-------+-----------------+-----+
|  count|                4|    4|
|   mean|             25.0| null|
| stddev|5.163977794943222| null|
|    min|               19|Alexa|
|    max|               31|  May|
+-------+-----------------+-----+



## Change DataFrame Schema

In [23]:
from pyspark.sql.types import (StructType, StructField, 
                               IntegerType, StringType)

In [24]:
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]

In [26]:
final_struct = StructType(fields=data_schema)

In [27]:
df = spark.read.json("people.json", schema=final_struct)

In [28]:
df.show()

+---+-------+
|age|   name|
+---+-------+
| 23|Gilbert|
| 19|  Alexa|
| 27|    May|
| 31|Deloise|
+---+-------+



In [29]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



## DataFrame Row and Column

In [30]:
df['age']

Column<b'age'>

In [31]:
type(df['age'])

pyspark.sql.column.Column

In [36]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [37]:
df.select('age').show()

+---+
|age|
+---+
| 23|
| 19|
| 27|
| 31|
+---+



In [38]:
df.head(2)

[Row(age=23, name='Gilbert'), Row(age=19, name='Alexa')]

In [42]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [44]:
df.select(['age', 'name']).show()

+---+-------+
|age|   name|
+---+-------+
| 23|Gilbert|
| 19|  Alexa|
| 27|    May|
| 31|Deloise|
+---+-------+



In [45]:
df.withColumn('newage', df['age']).show()

+---+-------+------+
|age|   name|newage|
+---+-------+------+
| 23|Gilbert|    23|
| 19|  Alexa|    19|
| 27|    May|    27|
| 31|Deloise|    31|
+---+-------+------+



In [46]:
df.withColumn('double_age', df['age']*2).show()

+---+-------+----------+
|age|   name|double_age|
+---+-------+----------+
| 23|Gilbert|        46|
| 19|  Alexa|        38|
| 27|    May|        54|
| 31|Deloise|        62|
+---+-------+----------+



In [47]:
df.show()

+---+-------+
|age|   name|
+---+-------+
| 23|Gilbert|
| 19|  Alexa|
| 27|    May|
| 31|Deloise|
+---+-------+



In [48]:
df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|        23|Gilbert|
|        19|  Alexa|
|        27|    May|
|        31|Deloise|
+----------+-------+



## Convert DataFrame to SQL View

In [49]:
df.createOrReplaceTempView('people')

In [50]:
results = spark.sql("SELECT * FROM people")

In [51]:
results.show()

+---+-------+
|age|   name|
+---+-------+
| 23|Gilbert|
| 19|  Alexa|
| 27|    May|
| 31|Deloise|
+---+-------+



In [52]:
new_results = spark.sql("SELECT * FROM people WHERE age=31")

In [53]:
new_results.show()

+---+-------+
|age|   name|
+---+-------+
| 31|Deloise|
+---+-------+



## Spark DataFrame Basic Operations

In [54]:
df = spark.read.csv("AAPL.csv", inferSchema=True, header=True)

In [55]:
df.show()

+-------------------+----------+----------+----------+----------+----------+--------+
|               Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+-------------------+----------+----------+----------+----------+----------+--------+
|2019-05-13 00:00:00|187.710007|189.479996|182.850006|185.720001|183.529678|57430600|
|2019-05-14 00:00:00|186.410004|189.699997|185.410004|188.660004|186.434998|36529700|
|2019-05-15 00:00:00|186.270004|    191.75|186.020004|190.919998|188.668335|26544700|
|2019-05-16 00:00:00|189.910004|192.470001|188.839996|190.080002|187.838257|33031400|
|2019-05-17 00:00:00|186.929993|190.899994|186.759995|     189.0|186.770996|32879100|
|2019-05-20 00:00:00|183.520004|184.350006|180.279999|183.089996|180.930695|38612300|
|2019-05-21 00:00:00|185.220001|     188.0|184.699997|186.600006|184.399307|28364800|
|2019-05-22 00:00:00|184.660004|185.710007|182.550003|182.779999|180.624329|29748600|
|2019-05-23 00:00:00|179.800003|180.539993|177.809998|

In [59]:
df.head(3)[1]

Row(Date=datetime.datetime(2019, 5, 14, 0, 0), Open=186.410004, High=189.699997, Low=185.410004, Close=188.660004, Adj Close=186.434998, Volume=36529700)

In [64]:
df.filter('Close > 180').show()

+-------------------+----------+----------+----------+----------+----------+--------+
|               Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+-------------------+----------+----------+----------+----------+----------+--------+
|2019-05-13 00:00:00|187.710007|189.479996|182.850006|185.720001|183.529678|57430600|
|2019-05-14 00:00:00|186.410004|189.699997|185.410004|188.660004|186.434998|36529700|
|2019-05-15 00:00:00|186.270004|    191.75|186.020004|190.919998|188.668335|26544700|
|2019-05-16 00:00:00|189.910004|192.470001|188.839996|190.080002|187.838257|33031400|
|2019-05-17 00:00:00|186.929993|190.899994|186.759995|     189.0|186.770996|32879100|
|2019-05-20 00:00:00|183.520004|184.350006|180.279999|183.089996|180.930695|38612300|
|2019-05-21 00:00:00|185.220001|     188.0|184.699997|186.600006|184.399307|28364800|
|2019-05-22 00:00:00|184.660004|185.710007|182.550003|182.779999|180.624329|29748600|
|2019-06-05 00:00:00|184.279999|184.990005|181.139999|

In [69]:
df.filter('Close > 180').select(['Open', 'Close']).show()

+----------+----------+
|      Open|     Close|
+----------+----------+
|187.710007|185.720001|
|186.410004|188.660004|
|186.270004|190.919998|
|189.910004|190.080002|
|186.929993|     189.0|
|183.520004|183.089996|
|185.220001|186.600006|
|184.660004|182.779999|
|184.279999|182.539993|
|183.080002|185.220001|
|186.509995|190.149994|
|191.809998|192.580002|
|194.860001|194.809998|
|193.949997|194.190002|
|194.699997|194.149994|
|191.550003|192.740005|
|192.899994|193.889999|
|196.050003|198.449997|
|199.679993|197.869995|
|200.369995|199.460007|
+----------+----------+
only showing top 20 rows



In [70]:
df.filter('Close > 180').select(['Open', 'Close']).head(5)

[Row(Open=187.710007, Close=185.720001),
 Row(Open=186.410004, Close=188.660004),
 Row(Open=186.270004, Close=190.919998),
 Row(Open=189.910004, Close=190.080002),
 Row(Open=186.929993, Close=189.0)]

In [71]:
df.filter(df['Close'] < 500).select(['Volume']).show()

+--------+
|  Volume|
+--------+
|57430600|
|36529700|
|26544700|
|33031400|
|32879100|
|38612300|
|28364800|
|29748600|
|36529700|
|23714700|
|27948200|
|28481200|
|21218400|
|27043600|
|40396100|
|30968000|
|29773400|
|22526300|
|30684400|
|26220900|
+--------+
only showing top 20 rows



In [72]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+-------------------+----------+----------+----------+----------+----------+--------+
|               Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+-------------------+----------+----------+----------+----------+----------+--------+
|2019-06-20 00:00:00|200.369995|200.610001|198.029999|199.460007| 197.10762|21514000|
|2019-06-27 00:00:00|200.289993|201.570007|199.570007|199.740005|197.384323|20899700|
+-------------------+----------+----------+----------+----------+----------+--------+



In [73]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show()

+-------------------+----------+----------+----------+----------+----------+--------+
|               Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+-------------------+----------+----------+----------+----------+----------+--------+
|2019-05-13 00:00:00|187.710007|189.479996|182.850006|185.720001|183.529678|57430600|
|2019-05-14 00:00:00|186.410004|189.699997|185.410004|188.660004|186.434998|36529700|
|2019-05-15 00:00:00|186.270004|    191.75|186.020004|190.919998|188.668335|26544700|
|2019-05-16 00:00:00|189.910004|192.470001|188.839996|190.080002|187.838257|33031400|
|2019-05-17 00:00:00|186.929993|190.899994|186.759995|     189.0|186.770996|32879100|
|2019-05-20 00:00:00|183.520004|184.350006|180.279999|183.089996|180.930695|38612300|
|2019-05-21 00:00:00|185.220001|     188.0|184.699997|186.600006|184.399307|28364800|
|2019-05-22 00:00:00|184.660004|185.710007|182.550003|182.779999|180.624329|29748600|
|2019-05-23 00:00:00|179.800003|180.539993|177.809998|

In [75]:
df.filter(df['Low'] == 176.669998).show()

+-------------------+----------+----------+----------+----------+----------+--------+
|               Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+-------------------+----------+----------+----------+----------+----------+--------+
|2019-05-30 00:00:00|177.949997|179.229996|176.669998|178.300003|176.197189|21218400|
+-------------------+----------+----------+----------+----------+----------+--------+



In [82]:
result = df.filter(df['Low'] == 176.669998).collect()

In [83]:
result

[Row(Date=datetime.datetime(2019, 5, 30, 0, 0), Open=177.949997, High=179.229996, Low=176.669998, Close=178.300003, Adj Close=176.197189, Volume=21218400)]

In [85]:
row = result[0]

In [89]:
row.asDict()

{'Date': datetime.datetime(2019, 5, 30, 0, 0),
 'Open': 177.949997,
 'High': 179.229996,
 'Low': 176.669998,
 'Close': 178.300003,
 'Adj Close': 176.197189,
 'Volume': 21218400}

In [90]:
row.asDict()['Volume']

21218400

## GroupBy and Aggregate Functions

In [91]:
df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

In [92]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [93]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [94]:
df.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x1dd4c896508>

In [101]:
df.groupBy('Company').mean().show()
# df.groupBy('Company').max().show()
# df.groupBy('Company').min().show()
# df.groupBy('Company').count().show()
# df.groupBy('Company').sum().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [105]:
df.agg({'Sales': 'sum'}).show()
# df.agg({'Sales': 'min'}).show()
# df.agg({'Sales': 'max'}).show()
# df.agg({'Sales': 'count'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [106]:
group_data = df.groupBy('Company')

In [107]:
group_data.agg({'Sales': 'sum'}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [108]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [114]:
df.select(countDistinct('Sales')).show()
# df.select(avg('Sales')).show()
# df.select(stddev('Sales')).show()

df.select(countDistinct('Sales').alias('Sales count')).show()
# df.select(avg('Sales').alias('Sales Avg.')).show()
# df.select(stddev('Sales').alias('Sales STD')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+-----------+
|Sales count|
+-----------+
|         11|
+-----------+



In [115]:
from pyspark.sql.functions import format_number

In [118]:
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.select(format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [121]:
df.orderBy('Sales').show()
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Missing Data

In [122]:
df = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

In [123]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [125]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [127]:
# drop only those rows which have atleast 2 null
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [131]:
# by default -> drop has 'any'
df.na.drop(how='any').show()
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [133]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [137]:
df.na.fill('FILL VALUE').show()
df.na.fill(0).show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [139]:
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [140]:
from pyspark.sql.functions import mean

In [141]:
mean_val = df.select(mean(df['Sales'])).collect()

In [146]:
mean_sales = mean_val[0][0]
print(mean_sales)

400.5


In [147]:
df.na.fill(mean_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [150]:
# short form for filling values
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Dates and Timestamps

In [151]:
df = spark.read.csv('AAPL.csv', inferSchema=True, header=True)

In [152]:
df.head(1)

[Row(Date=datetime.datetime(2019, 5, 13, 0, 0), Open=187.710007, High=189.479996, Low=182.850006, Close=185.720001, Adj Close=183.529678, Volume=57430600)]

In [154]:
# df.select(['Date', 'Open']).show()

In [160]:
from pyspark.sql.functions import (dayofmonth, hour,
                                   dayofyear, month,
                                   year, weekofyear,
                                   format_number, date_format)

In [163]:
# df.select(dayofmonth(df['Date'])).show()
# df.select(hour(df['Date'])).show()
# df.select(dayofyear(df['Date'])).show()
# df.select(month(df['Date'])).show()
# df.select(year(df['Date'])).show()
# df.select(weekofyear(df['Date'])).show()

In [172]:
new_df = df.withColumn('Year', year(df['Date']))

In [173]:
result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [179]:
new_result = result.withColumnRenamed('avg(Close)', 'Avergae Closing Price')

In [183]:
new_result.select(['Year', format_number('Avergae Closing Price', 2).alias('Avg Close')]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2019|   223.93|
|2020|   289.62|
+----+---------+

