In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Step6-Trades") \
    .config("spark.executor.memory", "500mb") \
    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [5]:
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType
from pyspark.sql.types import StructType, StructField
import matplotlib.pyplot as plt
import pandas as pd
import re
import seaborn as sns
from os import listdir
from os.path import isfile, join

## Spark DataFrame

### Q1

In [6]:
!ls -lh /opt/spark-data/trades/1400

total 624K
-rwxrwxrwx 1 root root 124K Sep 12 07:05 1400-6-13.csv
-rwxrwxrwx 1 root root 123K Sep 12 07:05 1400-6-14.csv
-rwxrwxrwx 1 root root 125K Sep 12 07:06 1400-6-15.csv
-rwxrwxrwx 1 root root 123K Sep 12 07:07 1400-6-16.csv
-rwxrwxrwx 1 root root 121K Sep 12 07:04 1400-6-20.csv


In [8]:
last_day_file_path = '/opt/spark-data/trades/1400/1400-6-20.csv'
df = spark.read.option("header",True).csv(last_day_file_path)
df = df.withColumn("قیمت پایانی - مقدار", df["قیمت پایانی - مقدار"].cast("int"))
df.orderBy('قیمت پایانی - مقدار',ascending=False)\
.select('نماد','قیمت پایانی - مقدار').limit(10).toPandas()

                                                                                

Unnamed: 0,نماد,قیمت پایانی - مقدار
0,پست0008پ08,1832888
1,سكه0011پ02,1211746
2,سكه0012پ01,1210725
3,سكه0112پ03,1208117
4,سكه0012پ04,1207103
5,سكه0111پ05,1204265
6,صملي4042,1000000
7,اراد61,1000000
8,اراد24,1000000
9,اراد50,1000000


In [9]:
df.orderBy('قیمت پایانی - مقدار',ascending=True)\
.select('نماد','قیمت پایانی - مقدار').limit(10).toPandas()

                                                                                

Unnamed: 0,نماد,قیمت پایانی - مقدار
0,,
1,,
2,اطلس2,1.0
3,كمند2,1.0
4,كيان2,1.0
5,امين يكم2,1.0
6,سحرخيز2,1.0
7,آكورد2,1.0
8,هماي2,1.0
9,عيار2,1.0


### Q2

In [10]:
last_day_file_path = '/opt/spark-data/trades/1400/1400-6-20.csv'
df = spark.read.option("header",True).csv(last_day_file_path)
df = df.withColumn("month", lit(6))
df = df.withColumn("day", lit(20))
f_path='/opt/spark-data/trades/1400/'
onlyfiles = [f for f in listdir(f_path) if isfile(join(f_path, f))]
for f in onlyfiles:
    if f =='1400-6-20.csv':
        continue
    tmp = spark.read.option("header",True).csv(f_path+f)
    splt_date= f.replace('.csv','').split('-')
    month = int(splt_date[1])
    day = int(splt_date[2])
    tmp = tmp.withColumn("month", lit(month))
    tmp = tmp.withColumn("day", lit(day))
    df = df.union(tmp)
df = df.withColumnRenamed("نماد","symbol")\
    .withColumnRenamed('نام','full_name')\
    .withColumnRenamed('تعداد','quantity')\
    .withColumnRenamed('حجم','volume')\
    .withColumnRenamed('ارزش','value')\
    .withColumnRenamed('دیروز','yesterday_qnt')\
    .withColumnRenamed('اولین','first_order_value')\
    .withColumnRenamed('آخرین معامله - مقدار','last_order_value')\
    .withColumnRenamed('آخرین معامله - تغییر','last_order_value_change')\
    .withColumnRenamed('آخرین معامله - درصد','last_order_value_change_percent')\
    .withColumnRenamed('قیمت پایانی - مقدار','close_price')\
    .withColumnRenamed('قیمت پایانی - تغییر','close_price_change')\
    .withColumnRenamed('قیمت پایانی - درصد','close_price_change_percent')\
    .withColumnRenamed('کمترین','min_price')\
    .withColumnRenamed('بیشترین','max_price')
df=df.drop('_c0')
df = df.withColumn("volume", df["volume"].cast("int"))
df = df.withColumn("quantity", df["quantity"].cast("int"))
df = df.withColumn("close_price", df["close_price"].cast("int"))
df = df.withColumn("value", df["value"].cast("int"))
df = df.withColumn("yesterday_qnt", df["yesterday_qnt"].cast("int"))
df = df.withColumn("first_order_value", df["first_order_value"].cast("int"))
df = df.withColumn("last_order_value", df["last_order_value"].cast("int"))
df = df.withColumn("last_order_value_change", df["last_order_value_change"].cast("int"))
df = df.withColumn("last_order_value_change_percent", df["last_order_value_change_percent"].cast("double"))
df = df.withColumn("close_price_change", df["close_price_change"].cast("int"))
df = df.withColumn("close_price_change_percent", df["close_price_change_percent"].cast("double"))
df = df.withColumn("min_price", df["min_price"].cast("int"))
df = df.withColumn("max_price", df["max_price"].cast("int"))

print((df.count(), len(df.columns)))

[Stage 9:>                                                          (0 + 1) / 1]

(4635, 17)


                                                                                

In [11]:
df.cache()

[Stage 10:>                                                         (0 + 1) / 1]                                                                                

symbol,full_name,quantity,volume,value,yesterday_qnt,first_order_value,last_order_value,last_order_value_change,last_order_value_change_percent,close_price,close_price_change,close_price_change_percent,min_price,max_price,month,day
فجهان,مجتمع جهان فولاد ...,3037246,,,1000,5500,5500,,450.0,5500,,450.0,5500,5500,6,20
وهامون,سرمايه گذاري هامو...,75584,114171991.0,,4574,4630,4346,-228.0,-4.98,4446,-128.0,-2.8,4346,4710,6,20
مديريت,س. و خدمات مديريت...,46418,31696338.0,,10065,10568,10568,503.0,5.0,10568,503.0,5.0,10568,10568,6,20
پالايش,صندوق پالايشي يكم...,33343,37789524.0,,83040,84690,82470,-570.0,-0.69,83840,800.0,0.96,82400,86000,6,20
سپيد,سپيد ماكيان,32783,8354563.0,,63420,64680,62500,-920.0,-1.45,63620,200.0,0.32,61320,66560,6,20
دي,بانك دي,27110,858858978.0,,1303,1321,1238,-65.0,-4.99,1260,-43.0,-3.3,1238,1332,6,20
شستا,سرمايه گذاري تامي...,25338,112458233.0,,13460,13690,13420,-40.0,-0.3,13580,120.0,0.89,13270,13960,6,20
بركت,گروه دارويي بركت,23736,39129061.0,,44430,44050,42210,,-5.0,42670,,-3.96,42210,44600,6,20
شبندر,پالايش نفت بندرعباس,17943,237903013.0,,6310,6460,6420,110.0,1.74,6470,160.0,2.54,6360,6620,6,20
دارا يكم,صندوق واسطه گري م...,16686,8849521.0,,137140,139200,135220,,-1.4,137170,30.0,0.02,134910,140410,6,20


In [12]:
df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- volume: integer (nullable = true)
 |-- value: integer (nullable = true)
 |-- yesterday_qnt: integer (nullable = true)
 |-- first_order_value: integer (nullable = true)
 |-- last_order_value: integer (nullable = true)
 |-- last_order_value_change: integer (nullable = true)
 |-- last_order_value_change_percent: double (nullable = true)
 |-- close_price: integer (nullable = true)
 |-- close_price_change: integer (nullable = true)
 |-- close_price_change_percent: double (nullable = true)
 |-- min_price: integer (nullable = true)
 |-- max_price: integer (nullable = true)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)



In [13]:
df.groupBy('symbol').sum('volume')\
.agg(max(struct(col("sum(volume)"), col("symbol")))\
.alias("پر حجم ترین نماد")).limit(1).toPandas()


                                                                                

Unnamed: 0,پر حجم ترین نماد
0,"(5679418998, دي)"


### Q3

In [14]:
outSchema = StructType([StructField('symbol',StringType(),True),
                        StructField('close_price',IntegerType(),True),
                        StructField('max_diff',IntegerType(),True),
                        StructField('month',IntegerType(),True),
                        StructField('day',IntegerType(),True)
                       ])
def max_diff_extractor(df):
    # pdf is a pandas.DataFrame
    for index, row in df.iterrows():
        max_diff=0
        for index2, row2 in df.iterrows():
            if ((row2.day > row.day and row2.month ==row.month)):
                if ((row2.close_price - row.close_price) > max_diff):
                    max_diff = row2.close_price-row.close_price
                # max_diff = math.max(max_diff,row.close_price-row2.close_price)
        df.loc[index,'max_diff'] =  max_diff
    return df

df_top_diff=df.select('symbol','close_price','month','day')\
.groupby("symbol",'month').applyInPandas(max_diff_extractor,schema=outSchema)

df_top_diff = df_top_diff.groupBy('symbol','month').agg(max(col("max_diff")).alias("max_increase"))\
.sort('max_increase',ascending=False)
# df_top_diff.limit(20).toPandas()

In [15]:
df_top_diff.filter('month = 6').limit(10).toPandas()

                                                                                

Unnamed: 0,symbol,month,max_increase
0,تملي702,6,91013
1,پست0008پ08,6,82888
2,تملي705,6,67005
3,فپنتا,6,42320
4,سنفت007,6,39126
5,زير0103پ04,6,38680
6,تملي803,6,33720
7,مصفها203,6,28900
8,صشستا112,6,28619
9,بكاب,6,27220


### Q4

In [None]:
outSchema = StructType([StructField('symbol',StringType(),True),
                        StructField('close_price',IntegerType(),True),
                        StructField('max_diff',IntegerType(),True),
                        StructField('month',IntegerType(),True),
                        StructField('day',IntegerType(),True)
                       ])
def max_diff_extractor(df):
    # pdf is a pandas.DataFrame
    for index, row in df.iterrows():
        max_diff=0
        for index2, row2 in df.iterrows():
            if ((row2.day > row.day and row2.month ==row.month) or row2.month>row.month):
                if row.close_price-row2.close_price > max_diff:
                    max_diff = row.close_price-row2.close_price
                # max_diff = math.max(max_diff,row.close_price-row2.close_price)
        df.loc[index,'max_diff'] =  max_diff
    return df

df_top_diff=df.select('symbol','close_price','month','day')\
.groupby("symbol").applyInPandas(max_diff_extractor,schema=outSchema)

df_top_diff = df_top_diff.groupBy('symbol').agg(max(col("max_diff")))\
.sort('max(max_diff)',ascending=False)
df_top_diff.limit(20).toPandas()

## Spark SQL

### Preprocess (Create Table)

In [18]:
last_day_file_path = '/opt/spark-data/trades/1400/1400-6-20.csv'
df = spark.read.option("header",True).csv(last_day_file_path)
df = df.withColumn("month", lit(6))
df = df.withColumn("day", lit(20))
f_path='/opt/spark-data/trades/1400/'
onlyfiles = [f for f in listdir(f_path) if isfile(join(f_path, f))]
for f in onlyfiles:
    if f =='1400-6-20.csv':
        continue
    tmp = spark.read.option("header",True).csv(f_path+f)
    splt_date= f.replace('.csv','').split('-')
    month = int(splt_date[1])
    day = int(splt_date[2])
    tmp = tmp.withColumn("month", lit(month))
    tmp = tmp.withColumn("day", lit(day))
    df = df.union(tmp)
df = df.withColumnRenamed("نماد","symbol")\
    .withColumnRenamed('نام','full_name')\
    .withColumnRenamed('تعداد','quantity')\
    .withColumnRenamed('حجم','volume')\
    .withColumnRenamed('ارزش','value')\
    .withColumnRenamed('دیروز','yesterday_qnt')\
    .withColumnRenamed('اولین','first_order_value')\
    .withColumnRenamed('دیروز','yesterday_qnt')\
    .withColumnRenamed('آخرین معامله - مقدار','last_order_value')\
    .withColumnRenamed('آخرین معامله - تغییر','last_order_value_change')\
    .withColumnRenamed('آخرین معامله - درصد','last_order_value_change_percent')\
    .withColumnRenamed('قیمت پایانی - مقدار','close_price')\
    .withColumnRenamed('قیمت پایانی - تغییر','close_price_change')\
    .withColumnRenamed('قیمت پایانی - درصد','close_price_change_percent')\
    .withColumnRenamed('کمترین','min_price')\
    .withColumnRenamed('بیشترین','max_price')
df=df.drop('_c0')
df = df.withColumn("volume", df["volume"].cast("int"))
df = df.withColumn("close_price", df["close_price"].cast("int"))

df.createOrReplaceTempView("bors")
# spark.sql("select * from bors limit 5").show()
spark.sql("select count(*) as total_count from bors").show()



+-----------+
|total_count|
+-----------+
|       4635|
+-----------+



                                                                                

### Q1-2

In [37]:
spark.sql("""SELECT symbol,close_price FROM bors
    WHERE month=6 AND day = 20 
    ORDER BY close_price DESC LIMIT 10"""
    ).toPandas()

                                                                                

Unnamed: 0,symbol,close_price
0,پست0008پ08,1832888
1,سكه0011پ02,1211746
2,سكه0012پ01,1210725
3,سكه0112پ03,1208117
4,سكه0012پ04,1207103
5,سكه0111پ05,1204265
6,صملي4042,1000000
7,اراد61,1000000
8,اراد24,1000000
9,اراد50,1000000


In [38]:
spark.sql("""SELECT symbol,close_price FROM bors 
    WHERE month=6 AND day = 15 
    ORDER BY close_price ASC LIMIT 10"""
).toPandas()

                                                                                

Unnamed: 0,symbol,close_price
0,,
1,,
2,آسام2,1.0
3,خاتم2,1.0
4,عيار2,1.0
5,ارمغان2,1.0
6,كيان2,1.0
7,فردا2,1.0
8,افران2,1.0
9,سپيدما2,1.0


### Q2-2


In [19]:
spark.sql("""SELECT symbol,SUM(volume) as Sum_Volume
    FROM bors 
    GROUP BY symbol 
    ORDER BY Sum_Volume DESC 
    LIMIT 1"""
    ).toPandas()

                                                                                

Unnamed: 0,symbol,Sum_Volume
0,دي,5679418998


### Q3-2

In [20]:
query = """ select T.symbol,
       T.month,
       T.diff
from 
  (select T.symbol, T.month, T.diff,
  row_number() over(partition by T.month order by T.diff desc) as rn
  from 
    (SELECT first_tbl.symbol, first_tbl.month , Max((second_tbl.close_price - first_tbl.close_price)) as diff
    FROM 
      (SELECT * FROM bors) AS first_tbl
      JOIN
      (SELECT * FROM bors) AS second_tbl
      WHERE first_tbl.symbol = second_tbl.symbol AND first_tbl.month = second_tbl.month AND first_tbl.day < second_tbl.day
      GROUP BY first_tbl.symbol, first_tbl.month
      ) as T
     ) as T
where T.rn <= 10 """
spark.sql(query).toPandas()

                                                                                

Unnamed: 0,symbol,month,diff
0,تملي702,6,91013
1,پست0008پ08,6,82888
2,تملي705,6,67005
3,فپنتا,6,42320
4,سنفت007,6,39126
5,زير0103پ04,6,38680
6,تملي803,6,33720
7,مصفها203,6,28900
8,صشستا112,6,28619
9,بكاب,6,27220


### Q4-2

In [21]:
query = """SELECT symbol,MAX(diff) as m_diff from 
(SELECT nowPrice.symbol, (BPrice - APrice) as diff
FROM (SELECT symbol,month,day ,close_price AS APrice FROM bors)AS nowPrice JOIN 
(SELECT symbol, month, day , close_price as BPrice FROM bors) AS sixMonthAgo
WHERE nowPrice.symbol = sixMonthAgo.symbol AND (nowPrice.month > sixMonthAgo.month or 
(nowPrice.month = sixMonthAgo.month and nowPrice.day > sixMonthAgo.day) ) )
as diff_table GROUP by symbol ORDER by m_diff DESC Limit 20"""
spark.sql(query).toPandas()

                                                                                

Unnamed: 0,symbol,m_diff
0,تملي704,71650
1,صشستا112,70763
2,تملي706,57314
3,پست0008پ04,53704
4,تملي612,45167
5,تملي701,45129
6,تملي709,40375
7,تسه9812,34726
8,فپنتا,31250
9,تسه9902,30179


In [43]:
spark.stop()