In [1]:
import sys, glob, os
SPARK_HOME=os.environ['SPARK_HOME']
sys.path.append(SPARK_HOME + "/python")
sys.path.append(glob.glob(SPARK_HOME + "/python/lib/py4j*.zip")[0])
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sql = spark.sql
print(sc.uiWebUrl)

http://192.168.1.4:4040


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window, WindowSpec

In [3]:
stocks = spark.read.format("csv").options(header = True, inferSchema = True).load("/data/stocks.csv")

In [4]:
stocks.show()

+-------------------+---------+---------+---------+---------+---------+---------+------+
|               date|     open|     high|      low|    close|   volume| adjclose|symbol|
+-------------------+---------+---------+---------+---------+---------+---------+------+
|2000-07-17 00:00:00|  95.4375|     97.5|    92.75|   96.625|3508100.0|74.269199|  XLNX|
|2000-07-17 00:00:00|   22.625|    22.75|  22.4375|  22.5625| 201600.0| 13.48614|    ES|
|2000-07-17 00:00:00| 6.750002| 6.937503|    6.375|      6.5|1235700.0| 5.241649|   CHK|
|2000-07-17 00:00:00|19.812501|  20.1875|19.500001|  20.1875|1434100.0| 3.806147|    NI|
|2000-07-17 00:00:00|     30.5|  30.6875|     30.0| 30.03125| 254600.0| 19.81183|   SNA|
|2000-07-17 00:00:00|44.749996|45.062498|44.500004|45.000009| 535200.0|17.400773|  FOXA|
|2000-07-17 00:00:00|   19.625|   19.625|    19.25|   19.375| 309500.0|13.768835|     R|
|2000-07-17 00:00:00|  16.6562|  16.6875|   16.125|    16.25|5507200.0| 1.755466|  ROST|
|2000-07-17 00:00:00|

In [5]:
stocks.createOrReplaceTempView("stocks")

In [6]:
stocks_agg = spark.sql("""
select symbol, avg(volume) avg_vol 
    from stocks 
where year(date) = 2016 group by symbol 
order by avg_vol desc
""")

In [7]:
stocks_agg.show()

+------+--------------------+
|symbol|             avg_vol|
+------+--------------------+
|   BAC|1.0995368974358974E8|
|   FCX|4.7979558333333336E7|
|   CHK|4.1622735256410256E7|
|  AAPL|4.0944183974358976E7|
|    GE|3.7751663461538464E7|
|     F| 3.743219743589743E7|
|   PFE|3.5777183974358976E7|
|  MSFT| 3.419444807692308E7|
|    FB|2.8902566025641024E7|
|    MU|2.7260807692307692E7|
|    AA|2.6758177564102564E7|
|   MRO| 2.649072564102564E7|
|  CSCO|2.6301159615384616E7|
|  INTC|2.4185176282051284E7|
|     T| 2.371588846153846E7|
|     C| 2.338251282051282E7|
|   KMI|2.1680207692307692E7|
|    RF|2.0861117307692308E7|
|   WFC|1.9612389743589744E7|
|   SWN|1.8398848717948716E7|
+------+--------------------+
only showing top 20 rows



In [8]:
stocks_agg.coalesce(1).write.format("json").mode("overwrite").save("/tmp/stocks-summary")

In [9]:
stocks.describe().toPandas()

Unnamed: 0,summary,open,high,low,close,volume,adjclose,symbol
0,count,1857092.0,1857092.0,1857092.0,1857092.0,1857092.0,1857092.0,1857092
1,mean,54.39150019132181,55.03561088277836,53.73182845876957,54.40520389624489,5438876.668468767,43.02996494097698,
2,stddev,60.84195032737054,61.4366673511778,60.21132386902276,60.839881015722526,14000893.2519909,63.518893483888206,
3,min,0.21,0.22,0.19,0.21,0.0,0.18312,A
4,max,1475.0,1476.52002,1462.75,1469.560059,1855410200.0,1553.982779,ZION


In [10]:
stocks.dtypes

[('date', 'timestamp'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'double'),
 ('adjclose', 'double'),
 ('symbol', 'string')]

In [11]:
(stocks
 .withColumn("date", expr("cast(date as date) date"))
.withColumn("pct", expr("(close-open)/open"))
 .filter("date>'2016-08-02'")
 .groupBy("symbol")
 .pivot("date")
 .agg(expr("max(pct)"))
).toPandas()

Unnamed: 0,symbol,2016-08-03,2016-08-04,2016-08-05,2016-08-08,2016-08-09,2016-08-10,2016-08-11,2016-08-12,2016-08-15
0,LEN,0.007489,0.000874,0.014524,-0.007452,0.006880,-0.002132,0.004045,-0.008063,0.008744
1,GIS,-0.018841,0.007000,0.000709,0.001986,0.002544,0.001409,-0.000422,0.000140,0.000141
2,K,-0.009891,-0.000243,-0.001569,0.000362,0.000724,-0.000843,0.003487,0.000359,0.000120
3,ALXN,0.033251,-0.019902,0.009349,-0.007729,-0.004058,-0.019511,0.006097,0.003761,0.014086
4,SPGI,0.004083,-0.003582,0.000828,-0.005216,0.001162,-0.006550,0.002414,-0.002413,-0.000831
5,AVY,-0.006304,-0.000517,0.007965,0.004979,-0.001395,-0.000508,-0.006219,0.001665,-0.002934
6,AIV,-0.004200,-0.003774,0.008897,0.011018,0.002180,-0.014115,-0.013413,0.004012,0.009756
7,RF,0.014590,-0.002208,0.018438,-0.001064,-0.003191,-0.011740,0.004320,0.002172,0.029064
8,MMM,-0.001902,-0.006426,0.001795,-0.001789,0.000112,0.003930,0.010721,-0.006613,-0.002431
9,PPG,0.001921,-0.001918,-0.001721,0.000287,0.000192,0.000000,-0.000763,-0.005927,0.003169


In [15]:
stocks.describe().toPandas()

Unnamed: 0,summary,open,high,low,close,volume,adjclose,symbol
0,count,1857092.0,1857092.0,1857092.0,1857092.0,1857092.0,1857092.0,1857092
1,mean,54.39150019132181,55.03561088277836,53.73182845876957,54.40520389624489,5438876.668468767,43.02996494097698,
2,stddev,60.84195032737054,61.4366673511778,60.21132386902276,60.839881015722526,14000893.2519909,63.518893483888206,
3,min,0.21,0.22,0.19,0.21,0.0,0.18312,A
4,max,1475.0,1476.52002,1462.75,1469.560059,1855410200.0,1553.982779,ZION


In [17]:
stocks.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjclose: string (nullable = true)
 |-- symbol: string (nullable = true)



In [13]:
stocks.describe().toJSON().collect()

['{"summary":"count","open":"1857092","high":"1857092","low":"1857092","close":"1857092","volume":"1857092","adjclose":"1857092","symbol":"1857092"}',
 '{"summary":"mean","open":"54.39150019132181","high":"55.03561088277836","low":"53.73182845876957","close":"54.40520389624489","volume":"5438876.668468767","adjclose":"43.02996494097698"}',
 '{"summary":"stddev","open":"60.84195032737054","high":"61.436667351177796","low":"60.21132386902276","close":"60.839881015722526","volume":"1.40008932519909E7","adjclose":"63.518893483888206"}',
 '{"summary":"min","open":"0.21","high":"0.22","low":"0.19","close":"0.21","volume":"0.0","adjclose":"0.18312","symbol":"A"}',
 '{"summary":"max","open":"1475.0","high":"1476.52002","low":"1462.75","close":"1469.560059","volume":"1.8554102E9","adjclose":"1553.982779","symbol":"ZION"}']