In [1]:
from pyspark import SparkContext
from pyspark.streaming  import StreamingContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.feature import Bucketizer
import pyspark.sql.functions as f
from pyspark.sql.functions import col

In [2]:
sc = SparkContext()
ssc = StreamingContext(sc,10)
spark = SQLContext(sc)

In [3]:
from pyspark.sql.types import *
newDF=[StructField('ticker',StringType(),True),
       StructField('date',StringType(),True),
       StructField('time',StringType(),True),
       StructField('open',FloatType(),True),
       StructField('high',FloatType(),True),
       StructField('low',FloatType(),True),
       StructField('close',FloatType(),True),
       StructField('vol',IntegerType(),True)
       ]
finalStruct=StructType(fields=newDF)

In [4]:
df = spark.read.csv('/home/indranil/prices/all_csv/*.csv', schema=finalStruct,header=False)

In [5]:
df.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- vol: integer (nullable = true)



In [6]:
df = df.filter(~df['ticker'].isin(['<TICKER>']))

In [7]:
#df.groupBy('ticker').count().show()

In [8]:
df.dtypes

[('ticker', 'string'),
 ('date', 'string'),
 ('time', 'string'),
 ('open', 'float'),
 ('high', 'float'),
 ('low', 'float'),
 ('close', 'float'),
 ('vol', 'int')]

In [9]:
from pyspark.sql.functions import udf, struct

In [10]:
#test_str = "20010102"

In [11]:
#test_str[-2:]
# df = df.withColumn("date", df["date"].cast(DateType()))

In [12]:
df.show()

+------+--------+------+------+------+------+------+---+
|ticker|    date|  time|  open|  high|   low| close|vol|
+------+--------+------+------+------+------+------+---+
|GBPJPY|20010102|230100|171.88|171.88|171.86|171.86|  4|
|GBPJPY|20010102|230200|171.86|171.87|171.86|171.87|  4|
|GBPJPY|20010102|230300|171.87|171.87|171.87|171.87|  4|
|GBPJPY|20010102|230400|171.87|171.88|171.87|171.88|  4|
|GBPJPY|20010102|230500|171.88|171.88|171.88|171.88|  4|
|GBPJPY|20010102|230600|171.88|171.88|171.88|171.88|  4|
|GBPJPY|20010102|230700|171.88|171.88|171.88|171.88|  4|
|GBPJPY|20010102|230800|171.88|171.88|171.87|171.87|  4|
|GBPJPY|20010102|230900|171.87|171.87|171.87|171.87|  4|
|GBPJPY|20010102|231100|171.87|171.88|171.87|171.88|  4|
|GBPJPY|20010102|231200|171.88|171.89|171.88|171.89|  4|
|GBPJPY|20010102|231300|171.89|171.89|171.89|171.89|  4|
|GBPJPY|20010102|231400|171.88|171.88|171.81|171.81|  4|
|GBPJPY|20010102|231500|171.81|171.81| 171.8| 171.8|  4|
|GBPJPY|20010102|231600|171.79|

In [13]:
get_year = udf(lambda x:x[:4],StringType())
get_month = udf(lambda x:x[4:6],StringType())
get_day = udf(lambda x:x[-2:],StringType())
get_hour = udf(lambda x:x[:2],StringType())
get_min = udf(lambda x:x[2:4],StringType())
df = df.withColumn("year", get_year(df['date']))
df = df.withColumn("month", get_month(df['date']))
df = df.withColumn("day", get_day(df['date']))
df = df.withColumn("hour", get_hour(df['time']))
df = df.withColumn("minute", get_min(df['time']))

In [14]:
get_month = udf(lambda x:x[4:6],StringType())
get_day = udf(lambda x:x[-2:],StringType())
get_hour = udf(lambda x:x[:2],StringType())
get_min = udf(lambda x:x[2:4],StringType())

In [15]:
df = df.withColumn("year", get_year(df['date']))
df = df.withColumn("month", get_month(df['date']))
df = df.withColumn("day", get_day(df['date']))
df = df.withColumn("hour", get_hour(df['time']))
df = df.withColumn("minute", get_min(df['time']))

In [16]:
df = df.withColumn("change", col('high')-col('low'))

In [17]:
df = df.withColumn("year", df["year"].cast(IntegerType()))
df = df.withColumn("month", df["month"].cast(IntegerType()))
df = df.withColumn("day", df["day"].cast(IntegerType()))
df = df.withColumn("year", df["year"].cast(IntegerType()))

In [19]:
g_df = df.groupby(['year','month']).agg(f.avg('change'))

In [20]:
g_df =g_df.toPandas()

In [22]:
import numpy as np 

In [25]:
np.sum(np.diff(g_df['avg(change)'].to_list()))

0.01424956577416933

In [25]:

from pyspark.ml.feature import Bucketizer

In [26]:
def approx_quartile(df,col,outcol,qur =8):
    y = list(np.true_divide(np.arange(qur),qur-1))
    y.pop(0)
    splits = df.approxQuantile(col, y, 0.0005) # no need
    splits.insert(0,0)
    bucketizer = Bucketizer(splits=splits,inputCol=col, outputCol=outcol)
    df= bucketizer.transform(df)
    return df
    


In [27]:
df = approx_quartile(df,"high","buckethigh")

In [28]:
df = approx_quartile(df,"low","bucketlow")

In [29]:
df = approx_quartile(df,"open","bucketopen")

In [30]:
df = approx_quartile(df,"close","bucketclose")

In [32]:
df

DataFrame[ticker: string, date: string, time: string, open: float, high: float, low: float, close: float, vol: int, year: string, month: string, day: string, hour: string, minute: string, buckethigh: double, bucketlow: double, bucketopen: double, bucketclose: double]

In [33]:
df_low = df.filter(df['bucketopen'].isin([0.0]))

In [36]:
df_low.groupBy('bucketclose').count().show()

+-----------+--------+
|bucketclose|   count|
+-----------+--------+
|        0.0|15197692|
|        1.0|  113299|
+-----------+--------+



In [37]:
df.groupBy('bucketopen').count().show()

+----------+--------+
|bucketopen|   count|
+----------+--------+
|       0.0|15310991|
|       1.0|15135190|
|       4.0|15184941|
|       3.0|15185339|
|       2.0|15215142|
|       6.0|15163231|
|       5.0|15167450|
+----------+--------+



In [36]:
sum_v =df1.agg(f.sum('close'))

In [37]:
sum_v.show()

+-------------------+
|         sum(close)|
+-------------------+
|5.525987008346418E8|
+-------------------+

