In [1]:
#SparkSession is the entry point to Spark SQL. It is the very first object 
# you have to create while developing Spark SQL applications.
from pyspark.sql import SparkSession

In [2]:
#You use the SparkSession.builder method to create an instance of SparkSession
spark = SparkSession.builder.appName('Demo').getOrCreate()

In [3]:
# Read the data from the csv file into a dataframe
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [4]:
# Dispaly the dataframe
df.show()

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:...|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:...|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:...|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:...|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:...|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902

In [5]:
#Display the schema of the dataframe
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
# To see what features are in
df.columns

In [None]:
# To get a statistical summary of the data
df.describe()

In [None]:
df.describe().show()

In [None]:
df.head(3)[0]

In [None]:
# Select data with closing price less than 500. 
df.filter  ("Close<500").show()

In [None]:
# this is similar to SQL syntax, selecting the two named columns from the data
# you can use filter() to return only the rows that match the given criteria
df.filter  ("Close<500").select(['Open','Close']).show()

In [None]:
df.filter(df['Close']< 500).show()

In [None]:
df.filter(df['Close']< 500).select ('Volume').show()

In [None]:
#df.filter ((df['Close']<200 ) & (df['Open']>200)).show()
df.filter ((df['Close']<200 ) & ~(df['Open']>200)).show()
df.filter(df['Low'] == 197.16).show()

In [None]:
# you need to collect the filtered results for further processing
r =df.filter(df['Low'] == 197.16).collect()
r[0]

In [None]:
# turn the results into a dictionary so you can retrieve value for certain field easily
r[0].asDict()
#r[0].asDict()['Volume']

In [None]:
# you can also use where to do something similar to filter()
df.where("Close<200").sort("Close").show()

In [None]:
df1=spark.read.csv("ContainsNull.csv", inferSchema=True, header = True)

In [None]:
df1.show()

In [None]:
df1.show()

In [None]:
# Drop missing data
df1.na.drop().show()

In [None]:
df1.na.drop(thresh=2).show()

In [None]:
df1.na.drop(how='all').show()

In [None]:
df1.na.drop(how='any').show()

In [None]:
df1.na.drop(subset=['Sales']).show()

In [115]:
df1.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [117]:
df1.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [118]:
from pyspark.sql.functions import mean

In [122]:
mean = df1.select(mean(df1['Sales'])).collect()

In [123]:
mean

[Row(avg(Sales)=400.5)]

In [124]:
mean[0]

Row(avg(Sales)=400.5)

In [125]:
mean[0][0]

400.5

In [126]:
sales_mean=mean[0][0]
df1.na.fill(sales_mean, ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [132]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [134]:
df = spark.read.csv('appl_stock.csv', header = True, inferSchema = True)

In [135]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [136]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [137]:
df.select(['Date', 'Open']).show()

+--------------------+------------------+
|                Date|              Open|
+--------------------+------------------+
|2010-01-04 00:00:...|        213.429998|
|2010-01-05 00:00:...|        214.599998|
|2010-01-06 00:00:...|        214.379993|
|2010-01-07 00:00:...|            211.75|
|2010-01-08 00:00:...|        210.299994|
|2010-01-11 00:00:...|212.79999700000002|
|2010-01-12 00:00:...|209.18999499999998|
|2010-01-13 00:00:...|        207.870005|
|2010-01-14 00:00:...|210.11000299999998|
|2010-01-15 00:00:...|210.92999500000002|
|2010-01-19 00:00:...|        208.330002|
|2010-01-20 00:00:...|        214.910006|
|2010-01-21 00:00:...|        212.079994|
|2010-01-22 00:00:...|206.78000600000001|
|2010-01-25 00:00:...|202.51000200000001|
|2010-01-26 00:00:...|205.95000100000001|
|2010-01-27 00:00:...|        206.849995|
|2010-01-28 00:00:...|        204.930004|
|2010-01-29 00:00:...|        201.079996|
|2010-02-01 00:00:...|192.36999699999998|
+--------------------+------------

In [138]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format

In [139]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [140]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [141]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [151]:
newdf=df.withColumn("Year", year(df['Date']))
r = newdf.groupBy("Year").mean().select(["Year", "avg(Close)"])
new = r.withColumnRenamed("avg(Close)", "Average Closing Price")
new.select(['Year',format_number('Average Closing Price',2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

