In [25]:
import findspark
findspark.init()

from pyspark.sql import SparkSession 
from pyspark import SparkContext
sc = SparkContext.getOrCreate()


In [26]:
spark = SparkSession.builder.appName("DataFrame Basics").getOrCreate()

In [31]:
df = spark.read.json('people.json')

In [32]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [33]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [34]:
df.columns

['age', 'name']

In [35]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [36]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



Manuvally set the DataFrame schema

In [37]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [38]:
data_schema = [StructField('age', IntegerType(), True), 
                StructField('name', StringType(),True)]

In [40]:
final_struc = StructType(fields=data_schema)

In [41]:
df = spark.read.json('people.json', schema = final_struc)

In [42]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [43]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [45]:
type(df['age'])    # return the column 

pyspark.sql.column.Column

In [47]:
df.select('age').show()  # return a DataFrame ontains a single column

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [50]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [51]:
df.head(2) # it will get a list of ROW Objects

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [52]:
df.head(2)[0]

Row(age=None, name='Michael')

In [53]:
type(df.head(2)[0])  # ROW Object 

pyspark.sql.types.Row

In [54]:
df.select(['age','name']).show()   # select multiple columns

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [55]:
df.withColumn('newage', df['age']).show()  # add a new column using exisisting column(simply exisisting column data copied into newly added column)

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [56]:
df.withColumn('doubleage', df['age']*2).show()

+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|null|Michael|     null|
|  30|   Andy|       60|
|  19| Justin|       38|
+----+-------+---------+



In [57]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [58]:
df.withColumnRenamed('age','my_new_age').show()  # Renaming the column 

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [60]:
df.createOrReplaceTempView('people') # you need to register a DataFrame as temparary view to perform sql queries over the DataFrame

In [62]:
results = spark.sql("select * from people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [63]:
new_results = spark.sql("select * from people where age =30").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



Spark DataFrame Basic Operations

In [66]:
df = spark.read.csv("appl_stock.csv", inferSchema=True, header=True)

In [68]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [69]:
df.show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [70]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [74]:
df.filter("Close < 500").show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [76]:
df.filter("Close < 500").select(['Open','Close']).show(5)

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|213.429998|        214.009998|
|214.599998|        214.379993|
|214.379993|        210.969995|
|    211.75|            210.58|
|210.299994|211.98000499999998|
+----------+------------------+
only showing top 5 rows



In [79]:
df.filter(df['Close'] < 500).select("Volume").show(5)

+---------+
|   Volume|
+---------+
|123432400|
|150476200|
|138040000|
|119282800|
|111902700|
+---------+
only showing top 5 rows



In [83]:
# multiple conditional data 

df.filter((df['Close'] < 200 ) & (df['Open'] > 200)).show(5)

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [84]:
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [86]:
result = df.filter(df['Low'] == 197.16).collect()

In [87]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [89]:
row = result[0]

In [90]:
row.asDict()

{'Adj Close': 25.620401,
 'Close': 197.75,
 'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'High': 207.499996,
 'Low': 197.16,
 'Open': 206.78000600000001,
 'Volume': 220441900}

In [91]:
row.asDict()['Volume']

220441900

 Spark DataFrames GroupBy and Aggregate

In [95]:
df = spark.read.csv('sales_info.csv', inferSchema=True, header= True)

In [96]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [97]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [98]:
df.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x112a3ee48>

In [100]:
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [101]:
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [102]:
df.groupBy("Company").min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [103]:
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [104]:
df.agg({'Sales': 'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [105]:
df.agg({'Sales': 'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [106]:
df.agg({'Sales': 'min'}).show()

+----------+
|min(Sales)|
+----------+
|     120.0|
+----------+



In [107]:
group_data = df.groupBy("Company")

In [108]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [109]:
from pyspark.sql.functions import countDistinct, avg, stddev    #  SQL functions 

In [110]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [112]:
df.select(avg('Sales').alias('Avarage sales')).show()

+-----------------+
|    Avarage sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [113]:
df.select(stddev("Sales")).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [114]:
from pyspark.sql.functions import format_number

In [115]:
sales_std = df.select(stddev('Sales').alias('std'))

In [118]:
sales_std.select(format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [121]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [122]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



Missing Data in DataFrames

In [123]:
df = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

In [124]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [125]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [126]:
#Droping the missing data

df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [127]:
df.na.drop(thresh=2).show() # this means ROW atleast has 2 non-null values

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [128]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [129]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [131]:
df.na.drop(subset=['Sales']).show()  # it delets the null values from sales column

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [132]:
# Fill the Missing values

df.na.fill('FILL VALUES').show()

+----+-----------+-----+
|  Id|       Name|Sales|
+----+-----------+-----+
|emp1|       John| null|
|emp2|FILL VALUES| null|
|emp3|FILL VALUES|345.0|
|emp4|      Cindy|456.0|
+----+-----------+-----+



In [133]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [135]:
df.na.fill('NO Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|NO Name| null|
|emp3|NO Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [137]:
from pyspark.sql.functions import mean

In [138]:
mean_val = df.select(mean(df['Sales'])).collect()

In [139]:
mean_val

[Row(avg(Sales)=400.5)]

In [142]:
mean_sales = mean_val[0][0]

In [143]:
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [145]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Dates and Timestamps

In [146]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [147]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [148]:
df.show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [150]:
df.select(['Date','Open']).show(5)

+-------------------+----------+
|               Date|      Open|
+-------------------+----------+
|2010-01-04 00:00:00|213.429998|
|2010-01-05 00:00:00|214.599998|
|2010-01-06 00:00:00|214.379993|
|2010-01-07 00:00:00|    211.75|
|2010-01-08 00:00:00|210.299994|
+-------------------+----------+
only showing top 5 rows



In [181]:
from pyspark.sql.functions import (dayofmonth,hour, dayofyear, month,year,weekofyear,format_number, date_format)

In [182]:
df.select(dayofmonth(df['Date'])).show(5)

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
+----------------+
only showing top 5 rows



In [183]:
df.select(hour(df['Date'])).show(5)

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



In [184]:
df.select(month(df['Date'])).show(5)

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 5 rows



In [185]:
df.select(year(df['Date'])).show(5)

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 5 rows



In [186]:
newdf = df.withColumn("Year", year(df['Date']))

In [187]:
result = newdf.groupBy("Year").mean().select(["Year","avg(Close)"])

In [188]:
result

DataFrame[Year: int, avg(Close): double]

In [189]:
new_result = result.withColumnRenamed("avg(Close)","Average Closing Price")

In [190]:
new_result.select(['Year',format_number('Average Closing Price',2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+



DataFrame Project

In [193]:
df = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)

In [195]:
df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [196]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [197]:
df.columns


['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [198]:
df.head(5)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [201]:
df.describe().show(5)

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [207]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [208]:
from pyspark.sql.functions import format_number

In [209]:
result = df.describe()

In [217]:
result.select(result['summary'], format_number(result['Open'].cast('float'),2).alias('Open'),
                     format_number(result['High'].cast('float'),2).alias('High'),
             format_number(result['Low'].cast('float'),2).alias('Low'),
             format_number(result['Close'].cast('float'),2).alias('Close'),
             format_number(result['Volume'].cast('int'),2).alias('Volume')).show()

+-------+--------+--------+--------+--------+-------------+
|summary|    Open|    High|     Low|   Close|       Volume|
+-------+--------+--------+--------+--------+-------------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.00|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780.00|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100.00|
+-------+--------+--------+--------+--------+-------------+



In [221]:
df2 = df.withColumn("HV Ratio", df['High']/df['Volume'])

In [222]:
df2.select('HV Ratio').show(5)

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
+--------------------+
only showing top 5 rows



In [227]:
df.orderBy(df['High'].desc()).head(1)[0][0]  # what day had the Peak High in Price ?

datetime.datetime(2015, 1, 13, 0, 0)

In [230]:
# what is the mean of the close cloumn?
from pyspark.sql.functions import mean,max,min
df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [231]:
# what is the max and min of the Volume Column

df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [234]:
# How many days was the Close lower than 60 dollars
df.filter('Close < 60').count()

81

In [235]:
df.filter(df['Close'] < 60).count()

81

In [236]:
from pyspark.sql.functions import count

In [237]:
result = df.filter(df['Close'] < 60)

In [238]:
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



In [241]:
# what percentage of the time was the High grate than 80 dollars

(df.filter(df['High']>80).count() / df.count())*100

9.141494435612083

In [242]:
# what is the pearson correlation between High and Volume

from pyspark.sql.functions import corr

In [243]:
df.select(corr('High','Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [244]:
# what is the max High per the year

from pyspark.sql.functions import year

yeardf = df.withColumn("Year", year(df['Date']))


In [245]:
max_df = yeardf.groupBy('Year').max()

In [246]:
max_df.select('Year', 'max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [247]:
# what is the average Close for each Calendar Month ?
from pyspark.sql.functions import month

In [248]:
monthdf = df.withColumn('Month', month('Date'))

In [249]:
monthavgs = monthdf.select(['Month','Close']).groupBy('Month').mean()

In [251]:
monthavgs.select('Month', 'avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

