In [0]:
from pyspark.sql import SparkSession

In [0]:
# Start a simple Spark Session
spark  = SparkSession.builder.appName('Exc').getOrCreate()

In [0]:
# Load the Walmart Stock CSV File, have Spark infer the data types.
df = sqlContext.sql('SELECT * FROM walmart_stock')

In [0]:
from pyspark.sql.functions import format_number,filter,cast,dayofyear,desc,mean,max,min,count

In [0]:
# What are the column names?
df.columns

Out[113]: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [0]:
# What does the Schema look like?
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: float (nullable = true)



In [0]:
# Print out the first 5 columns.
for row in df.head(5):
    print (row)
    print('\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001220703125, High=61.060001373291016, Low=59.869998931884766, Close=60.33000183105469, Volume=12668800, Adj Close=52.61923599243164)


Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209999084472656, High=60.349998474121094, Low=59.470001220703125, Close=59.709999084472656, Volume=9593300, Adj Close=52.07847595214844)


Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998474121094, High=59.619998931884766, Low=58.369998931884766, Close=59.41999816894531, Volume=12768200, Adj Close=51.825538635253906)


Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.41999816894531, High=59.45000076293945, Low=58.869998931884766, Close=59.0, Volume=8069400, Adj Close=51.45922088623047)


Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029998779296875, High=59.54999923706055, Low=58.91999816894531, Close=59.18000030517578, Volume=6679300, Adj Close=51.616214752197266)




In [0]:
# Use describe() to learn about the DataFrame.
df.describe().show()

+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|72.35785375452572| 72.8393880756178|71.91860094964979|72.38844997363553|8222093.481717011|67.23883840200064|
| stddev|6.768090251767697|6.768186825250206|6.744075739203606|6.756859160119612|  4519780.8431556|6.722609385249684|
|    min|            56.39|            57.06|             56.3|            56.42|          2094900|         50.36369|
|    max|             90.8|            90.97|            89.25|            90.47|         80898100|        84.914215|
+-------+-----------------+-----------------+-----------

In [0]:
result = df.describe()

In [0]:
# There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places.
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')).show()


+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [0]:
# Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.
df1 = df.withColumn('HV Ratio',df['High']/df['Volume'])
df1.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714682786927E-6|
|6.290848662516662E-6|
| 4.66941298944916E-6|
| 7.36733843444859E-6|
|8.915604814435727E-6|
|8.644477449144044E-6|
|9.351828386844425E-6|
| 8.29141562102703E-6|
|7.712212051589609E-6|
|7.071764777688419...|
|1.015495462653464...|
|  6.5763540967921E-6|
| 5.90145296180676E-6|
|8.547679390846264E-6|
|8.420709512685392E-6|
|1.041448335142357...|
|8.316075435382035E-6|
|9.721183804158345E-6|
|8.029435987746889E-6|
|6.307432228123159E-6|
+--------------------+
only showing top 20 rows



In [0]:
# What day had the Peak High in Price?
df.orderBy(df['High'].desc()).head(1)[0][0]

Out[120]: datetime.datetime(2015, 1, 13, 0, 0)

In [0]:
# What is the mean of the Close column?
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844997363553|
+-----------------+



In [0]:
# What is the max and min of the Volume column?
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [0]:
# How many days was the Close lower than 60 dollars?
df.filter('Close < 60').count()
df.filter(df['Close']<60).count()

Out[123]: 81

In [0]:
# What percentage of the time was the High greater than 80 dollars ?
df.filter(df['High']>80).count()/df.count()*100

Out[124]: 9.141494435612083

In [0]:
# What is the Pearson correlation between High and Volume?
from pyspark.sql.functions import corr,year,month
df.select(corr('High','Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061799556|
+-------------------+



In [0]:
# What is the max High per year?
yeardf = df.withColumn('Year',year(df['Date']))
maxdf = yeardf.groupBy('Year').max()
maxdf.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|    90.97|
|2013|    81.37|
|2014|    88.09|
|2012|     77.6|
|2016|    75.19|
+----+---------+



In [0]:
# What is the average Close for each Calendar Month?
monthdf = df.withColumn('Month',month('Date'))
monthavgs = monthdf.select(['Month','Close']).groupBy('Month').mean()
monthavgs.select('Month','avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1| 71.4480196131338|
|    2|71.30680438169499|
|    3|71.77794376266337|
|    4|72.97361900692894|
|    5|72.30971685445533|
|    6| 72.4953774506191|
|    7|74.43971944078106|
|    8| 73.0298185521906|
|    9|72.18411782208611|
|   10| 71.5785454489968|
|   11|72.11108927207418|
|   12|72.84792482628012|
+-----+-----------------+

