In [1]:
from pyspark.sql.functions import year
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
stock_prices = spark.read.format('csv').option('header','true').option('inferSchema','true').option('mode','DROPMALFORMED')\
.load("file:///media/alessandro/storage/big_data-primoProgetto/dataset/historical_stock_prices.csv")

stocks = spark.read.format('csv').option('header','true').option('inferSchema','true').option('mode','DROPMALFORMED')\
.load("file:///media/alessandro/storage/big_data-primoProgetto/dataset/historical_stocks.csv")

In [3]:
stock_prices=stock_prices.select('ticker','close',year("date").alias('year'))

In [4]:
stocks=stocks.select('ticker','sector')

In [5]:
joined = stock_prices.join(stocks, on='ticker')

In [6]:
joined.show(2,truncate= True)

+------+----------------+----+-------+
|ticker|           close|year| sector|
+------+----------------+----+-------+
|   AHH|11.5799999237061|2013|FINANCE|
|   AHH|11.5500001907349|2013|FINANCE|
+------+----------------+----+-------+
only showing top 2 rows



In [7]:
filtered = joined.filter((joined.year <= '2018') & (joined.year >= '2015') & (joined.sector != 'N/A'))

In [8]:
intermediate1 = filtered.groupBy('ticker','year','sector').agg(F.sum(filtered.close).alias('actualQuote'))

In [9]:
intermediate1 = intermediate1.sort(F.desc('ticker'),F.desc('year'))

In [10]:
intermediate2 = intermediate1.withColumn('previousQuote',
                                
                    F.lead('actualQuote').over(Window.partitionBy('ticker').orderBy(F.desc('ticker'),F.desc('year'))))

In [11]:
intermediate2 = intermediate2.sort(F.desc('ticker'),F.desc('year'))

In [12]:
intermediate2.show(2,truncate= True)

+------+----+-----------+------------------+-----------------+
|ticker|year|     sector|       actualQuote|    previousQuote|
+------+----+-----------+------------------+-----------------+
|  ZYNE|2018|HEALTH CARE|1615.8200020790102|3872.840003013611|
|  ZYNE|2017|HEALTH CARE| 3872.840003013611|2483.950002670288|
+------+----+-----------+------------------+-----------------+
only showing top 2 rows



In [13]:
intermediate3 = intermediate2.withColumn('percentage',
          F.round(100*(intermediate2['actualQuote'] - intermediate2['previousQuote']) / intermediate2['previousQuote']))

In [14]:
intermediate3 = intermediate3.select('ticker','year','sector','percentage')

intermediate3 = intermediate3.sort(F.desc('ticker'),F.desc('year'))

In [15]:
intermediate3 = intermediate3.filter((intermediate3.year != '2015'))

In [16]:
intermediate3.show(6)

+------+----+-----------+----------+
|ticker|year|     sector|percentage|
+------+----+-----------+----------+
|  ZYNE|2018|HEALTH CARE|     -58.0|
|  ZYNE|2017|HEALTH CARE|      56.0|
|  ZYNE|2016|HEALTH CARE|      36.0|
|  ZYME|2018|HEALTH CARE|      45.0|
|  ZYME|2017|HEALTH CARE|      null|
|   ZUO|2018| TECHNOLOGY|      null|
+------+----+-----------+----------+
only showing top 6 rows



In [17]:
intermediate4 = intermediate3.groupBy('ticker','sector').agg(F.collect_list(intermediate3.percentage).alias('trend'))

In [18]:
intermediate4 = intermediate4.filter(F.size(intermediate4.trend) == 3)

In [19]:
intermediate4.show(6)

+------+-----------------+--------------------+
|ticker|           sector|               trend|
+------+-----------------+--------------------+
|   VVI|    MISCELLANEOUS| [-29.0, 48.0, 20.0]|
|  TSLA|    CAPITAL GOODS| [-34.0, 49.0, -9.0]|
|  TROV|      HEALTH CARE|[-86.0, -76.0, -3...|
|  SSWN|   TRANSPORTATION|   [-34.0, 1.0, 0.0]|
|  SERV|CONSUMER SERVICES|  [-15.0, 13.0, 9.0]|
|  PTSI|   TRANSPORTATION| [22.0, -2.0, -54.0]|
+------+-----------------+--------------------+
only showing top 6 rows



In [28]:
result = intermediate4.alias('l').join(intermediate4.alias('r'), on='trend')\
    .where('r.sector != l.sector')\
    .select(F.col('l.ticker').alias('ticker1'), F.col('r.ticker').alias('ticker2'), 'l.trend')

In [30]:
result = result.sort('ticker1','ticker2')

In [34]:
result.show(26)

+-------+-------+--------------------+
|ticker1|ticker2|               trend|
+-------+-------+--------------------+
|   AFGE|    USM|  [-36.0, -1.0, 2.0]|
|   AMKR|    UMH| [-41.0, 39.0, 14.0]|
|   ARMK|  USATP|  [-33.0, 13.0, 9.0]|
|   CAKE|    TDS|   [-36.0, 1.0, 1.0]|
|    CCI|    TRV|  [-30.0, 10.0, 7.0]|
|  CHSCM|    GYB|   [-37.0, 1.0, 5.0]|
|  CHSCM|   MDLZ|   [-37.0, 1.0, 5.0]|
|  CHSCP|    XOM|  [-36.0, -5.0, 4.0]|
|    DCP|    UTX| [-28.0, 15.0, -6.0]|
|   DLPH|   GNMK|[-61.0, 21.0, -15.0]|
|    DTQ|    PRH|  [-36.0, -2.0, 3.0]|
|   DUKH|    PJH|  [-35.0, -2.0, 3.0]|
|    EAE|    ZBK|  [-36.0, -3.0, 4.0]|
|   GNMK|   DLPH|[-61.0, 21.0, -15.0]|
|    GYB|  CHSCM|   [-37.0, 1.0, 5.0]|
|    GYB|   MDLZ|   [-37.0, 1.0, 5.0]|
|    HST|    PKE|[-29.0, 14.0, -17.0]|
|    HTA|  SENEA|  [-44.0, 1.0, 16.0]|
|    ISG|    TDJ|   [-35.0, 0.0, 0.0]|
|    JOE|   SSWN|   [-34.0, 1.0, 0.0]|
|  LBTYK|    SUN|[-39.0, -3.0, -30.0]|
|   MDLZ|  CHSCM|   [-37.0, 1.0, 5.0]|
|   MDLZ|    GYB|   [-37.