# Step 3 (Work with DataFrame & Spark SQL)

## Environment Setup

### Install requirements

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

### Set environment variables

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

### Import libraries

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [118]:
import os
import pandas as pd
from google.colab import drive
from tqdm import tqdm

from pyspark.sql.functions import lit, col
from pyspark.sql.types import DoubleType, IntegerType, StringType, LongType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

### Mount drive for log file

In [5]:
drive.mount('/content/drive')

Mounted at /content/drive


## Make data ready

### Clean & Convert excel files to csv to be compatible with spark

In [14]:
stocks_folder_path = '/content/drive/MyDrive/BD Stocks/xlsx'
stocks_csv_folder_path = '/content/drive/MyDrive/BD Stocks/csv'
for file_name in tqdm(os.listdir(stocks_folder_path)):
  file_path = os.path.join(stocks_folder_path, file_name)
  data = pd.read_excel(file_path)
  header = data.iloc[1]
  data = data.iloc[2:]
  data.columns = header
  csv_file_name = file_name.split(".")[0] + ".csv"
  csv_path = os.path.join(stocks_csv_folder_path, csv_file_name)
  data.to_csv(csv_path)

100%|██████████| 40/40 [00:05<00:00,  6.94it/s]


### Make dataframe

In [69]:
is_first = True
for file_name in tqdm(os.listdir(stocks_csv_folder_path)):
  file_path = os.path.join(stocks_csv_folder_path, file_name)
  dataframe = spark.read.csv(file_path, header=True, sep=",")
  year, month, day = file_name.split('.')[0].split('_')[2:]
  dataframe = dataframe.withColumn("day", lit(day))
  dataframe = dataframe.withColumn("month", lit(month))
  dataframe = dataframe.withColumn("year", lit(year))
  dataframe = dataframe.withColumn('بیشترین', F.col('بیشترین').cast(IntegerType()))
  dataframe = dataframe.withColumn('کمترین', F.col('کمترین').cast(IntegerType()))
  dataframe = dataframe.withColumn('قیمت پایانی - درصد', F.col('قیمت پایانی - درصد').cast(DoubleType()))
  dataframe = dataframe.withColumn('قیمت پایانی - تغییر', F.col('قیمت پایانی - تغییر').cast(DoubleType()))
  dataframe = dataframe.withColumn('قیمت پایانی - مقدار', F.col('قیمت پایانی - مقدار').cast(IntegerType()))
  dataframe = dataframe.withColumn('آخرین معامله - درصد', F.col('آخرین معامله - درصد').cast(DoubleType()))
  dataframe = dataframe.withColumn('آخرین معامله - تغییر', F.col('آخرین معامله - تغییر').cast(DoubleType()))
  dataframe = dataframe.withColumn('آخرین معامله - مقدار', F.col('آخرین معامله - مقدار').cast(IntegerType()))
  dataframe = dataframe.withColumn('اولین', F.col('اولین').cast(IntegerType()))
  dataframe = dataframe.withColumn('دیروز', F.col('دیروز').cast(IntegerType()))
  dataframe = dataframe.withColumn('ارزش', F.col('ارزش').cast(LongType()))
  dataframe = dataframe.withColumn('حجم', F.col('حجم').cast(LongType()))
  dataframe = dataframe.withColumn('تعداد', F.col('تعداد').cast(LongType()))
  dataframe = dataframe.withColumn('دیروز', F.col('دیروز').cast(LongType()))
  dataframe = dataframe.withColumnRenamed('بیشترین', 'max_price') \
                        .withColumnRenamed('کمترین', 'min_price') \
                        .withColumnRenamed('قیمت پایانی - درصد', 'close_price_change_percent') \
                        .withColumnRenamed('قیمت پایانی - تغییر', 'close_price_change') \
                        .withColumnRenamed('قیمت پایانی - مقدار', 'close_price') \
                        .withColumnRenamed('آخرین معامله - درصد', 'last_order_value_change_percent') \
                        .withColumnRenamed('آخرین معامله - تغییر', 'last_order_value_change') \
                        .withColumnRenamed('آخرین معامله - مقدار', 'last_order_value') \
                        .withColumnRenamed('اولین', 'first_order_value') \
                        .withColumnRenamed('ارزش', 'value') \
                        .withColumnRenamed('دیروز', 'yesterday_qnt') \
                        .withColumnRenamed('حجم', 'volume') \
                        .withColumnRenamed('تعداد', 'quantity') \
                        .withColumnRenamed('نام', 'full_name') \
                        .withColumnRenamed('نماد', 'symbol')
  dataframe = dataframe.drop('_c0')
  if is_first:
    final_df = dataframe
    is_first = False
  else:
    final_df = final_df.union(dataframe)
print(f'Final dataframe rows count: {final_df.cache().count()}')
print('First 5 rows:')
final_df.show(5)


100%|██████████| 40/40 [00:10<00:00,  3.77it/s]


Final dataframe rows count: 35213
First 5 rows:
+------+--------------------+--------+--------+-------------+-------------+-----------------+----------------+-----------------------+-------------------------------+-----------+------------------+--------------------------+---------+---------+---+-----+----+
|symbol|           full_name|quantity|  volume|        value|yesterday_qnt|first_order_value|last_order_value|last_order_value_change|last_order_value_change_percent|close_price|close_price_change|close_price_change_percent|min_price|max_price|day|month|year|
+------+--------------------+--------+--------+-------------+-------------+-----------------+----------------+-----------------------+-------------------------------+-----------+------------------+--------------------------+---------+---------+---+-----+----+
|وكبهمن|مديريت سرمايه گذا...|  151766|95993891|1681595741898|        17905|            17010|           17385|                 -520.0|                           -2.9|      

### Make temporary table for sql queries

In [70]:
final_df.registerTempTable('stocks')

## Part 1

### With Dataframe

#### Extract last day dataframe

In [71]:
last_day_df = final_df.filter((final_df['day']==27) & 
                              (final_df['month']==12) & 
                              (final_df['year']==1399))
last_day_df.cache().show(5)
print(f"Final day record count: {last_day_df.count()}")


+--------+--------------------+--------+---------+-------------+-------------+-----------------+----------------+-----------------------+-------------------------------+-----------+------------------+--------------------------+---------+---------+---+-----+----+
|  symbol|           full_name|quantity|   volume|        value|yesterday_qnt|first_order_value|last_order_value|last_order_value_change|last_order_value_change_percent|close_price|close_price_change|close_price_change_percent|min_price|max_price|day|month|year|
+--------+--------------------+--------+---------+-------------+-------------+-----------------+----------------+-----------------------+-------------------------------+-----------+------------------+--------------------------+---------+---------+---+-----+----+
|  پالايش|صندوق پالايشي يکم...|   50519| 54374903|4662042552380|        86360|            87260|           84650|                -1710.0|                          -1.98|      85740|            -620.0|           

#### Most expensive

In [73]:
most_expensive = last_day_df.orderBy('close_price', ascending=False).limit(10)
print("10 Most expensive symbols:")
most_expensive.select(most_expensive['symbol'], most_expensive['close_price']).show()

10 Most expensive symbols:
+----------+-----------+
|    symbol|close_price|
+----------+-----------+
|   سنفت009|    1484467|
|پست0008پ09|    1398458|
|پست0008پ08|    1372579|
|پست0008پ04|    1362991|
|پست0008پ02|    1350000|
|    افاد14|    1105423|
|    صگل309|    1100000|
|سكه0011پ02|    1090808|
|سكه9912پ04|    1090417|
|سكه0012پ04|    1089400|
+----------+-----------+



#### Cheapest

In [74]:
most_expensive = last_day_df.orderBy('close_price').limit(10)
print("10 Cheapest symbols:")
most_expensive.select(most_expensive['symbol'], most_expensive['close_price']).show()

10 Cheapest symbols:
+---------+-----------+
|   symbol|close_price|
+---------+-----------+
|    كيان2|          1|
|امين يكم2|          1|
|   ياقوت2|          1|
| حامي1401|          1|
|   كارين2|          1|
|   آكورد2|          1|
|    كمند2|          1|
|  وبازار2|          1|
|    نهال2|          1|
|  اعتماد2|          1|
+---------+-----------+



### With Spark SQL

#### Most Expensive

In [83]:
print("10 Most expensive symbols:")
spark.sql("SELECT symbol, close_price " +
          "FROM stocks " +
          "WHERE (year == 1399 AND month==12 AND day==27) " +
          "ORDER BY close_price DESC " +
          "LIMIT 10;").show()

10 Most expensive symbols:
+----------+-----------+
|    symbol|close_price|
+----------+-----------+
|   سنفت009|    1484467|
|پست0008پ09|    1398458|
|پست0008پ08|    1372579|
|پست0008پ04|    1362991|
|پست0008پ02|    1350000|
|    افاد14|    1105423|
|    صگل309|    1100000|
|سكه0011پ02|    1090808|
|سكه9912پ04|    1090417|
|سكه0012پ04|    1089400|
+----------+-----------+



#### Cheapest

In [84]:
print("10 Cheapest symbols:")
spark.sql("SELECT symbol, close_price " +
          "FROM stocks " +
          "WHERE (year == 1399 AND month==12 AND day==27) " +
          "ORDER BY close_price ASC " +
          "LIMIT 10;").show()

10 Cheapest symbols:
+---------+-----------+
|   symbol|close_price|
+---------+-----------+
|    كيان2|          1|
|امين يكم2|          1|
|   ياقوت2|          1|
| حامي1401|          1|
|   كارين2|          1|
|   آكورد2|          1|
|    كمند2|          1|
|  وبازار2|          1|
|    نهال2|          1|
|  اعتماد2|          1|
+---------+-----------+



## Part 2

### With Dataframe

In [85]:
most_traded = final_df.orderBy('volume', ascending=False).limit(1)
print("Most traded symbol:")
most_traded.select(most_traded['symbol'], most_traded['volume']).show()

Most traded symbol:
+------+-----------+
|symbol|     volume|
+------+-----------+
| فارس4|34152999908|
+------+-----------+



### With Spark SQL

In [86]:
print("Most traded symbol:")
spark.sql("SELECT symbol, volume " +
          "FROM stocks " +
          "ORDER BY volume DESC " +
          "LIMIT 1;").show()

Most traded symbol:
+------+-----------+
|symbol|     volume|
+------+-----------+
| فارس4|34152999908|
+------+-----------+



## Part 3

### With Dataframe

In [121]:
symbol_month_change = (final_df
                       .groupBy(['symbol', 'month'])
                       .agg(F.sum('close_price_change').alias('month_change')))

window_spec = Window().partitionBy(['month']).orderBy(F.desc('month_change'))
ranked_symbols = symbol_month_change.withColumn("rank",F.rank().over(window_spec))
print("Most price rised symbol in each month:")
most_rised = (ranked_symbols
              .filter(ranked_symbols['rank'] < 11))
most_rised.show()

Most price rised symbol in each month:
+----------+-----+------------+----+
|    symbol|month|month_change|rank|
+----------+-----+------------+----+
|   تملي805|   11|    339281.0|   1|
|كشم0001پ01|   11|    188905.0|   2|
|سكه0112پ03|   11|    166519.0|   3|
|سكه0012پ01|   11|    163059.0|   4|
|سكه9912پ04|   11|    161819.0|   5|
|سكه0011پ02|   11|    161189.0|   6|
|پست0008پ02|   11|    147381.0|   7|
|   اراد134|   11|    145293.0|   8|
|    عكاوه2|   11|    139102.0|   9|
|   تسه9809|   11|    113860.0|  10|
|    افاد14|   12|    348744.0|   1|
|   تملي803|   12|    329504.0|   2|
|   تملي806|   12|    285136.0|   3|
|   تملي804|   12|    236888.0|   4|
|   تملي802|   12|    179685.0|   5|
|    افاد24|   12|    160836.0|   6|
|   تملي703|   12|    149670.0|   7|
|   عپلي جم|   12|    135483.0|   8|
|كشم0006پ02|   12|    129999.0|   9|
|   افاد844|   12|    112789.0|  10|
+----------+-----+------------+----+



### With Spark SQL

In [88]:
print("Most price rised symbol in each month:")
spark.sql("SELECT * " + 
          "FROM ( " + 
          "	SELECT symbol, month, month_change, " + 
          "	row_number() over (partition by month order by month_change desc) as symbol_rank " + 
          "	FROM ( " + 
          "		SELECT symbol, month, SUM(close_price_change) as month_change " + 
          "		FROM stocks  " + 
          "		GROUP BY symbol, month " + 
          "		) AS symbol_month_change " + 
          "	) AS ranked_symbols " + 
          "WHERE symbol_rank <= 10;").show()

Most price rised symbol in each month:
+----------+-----+------------+-----------+
|    symbol|month|month_change|symbol_rank|
+----------+-----+------------+-----------+
|   تملي805|   11|    339281.0|          1|
|كشم0001پ01|   11|    188905.0|          2|
|سكه0112پ03|   11|    166519.0|          3|
|سكه0012پ01|   11|    163059.0|          4|
|سكه9912پ04|   11|    161819.0|          5|
|سكه0011پ02|   11|    161189.0|          6|
|پست0008پ02|   11|    147381.0|          7|
|   اراد134|   11|    145293.0|          8|
|    عكاوه2|   11|    139102.0|          9|
|   تسه9809|   11|    113860.0|         10|
|    افاد14|   12|    348744.0|          1|
|   تملي803|   12|    329504.0|          2|
|   تملي806|   12|    285136.0|          3|
|   تملي804|   12|    236888.0|          4|
|   تملي802|   12|    179685.0|          5|
|    افاد24|   12|    160836.0|          6|
|   تملي703|   12|    149670.0|          7|
|   عپلي جم|   12|    135483.0|          8|
|كشم0006پ02|   12|    129999.0|      

## Part 4

### With Dataframe

In [127]:
most_fall = (final_df
             .groupBy('symbol')
             .agg(F.sum('close_price_change').alias('six_month_change'))
             .sort('six_month_change')).limit(10)
print("Most price fall symbol in 6 month:")
most_fall.show()

Most price fall symbol in 6 month:
+--------+----------------+
|  symbol|six_month_change|
+--------+----------------+
|   اطلس2|      -7030216.0|
| اراد314|      -1950368.0|
|تسه99102|      -1807954.0|
|تسه99092|      -1771084.0|
|   آگاس2|      -1769006.0|
|تسه99082|      -1754240.0|
| اراد344|      -1422494.0|
| اراد424|      -1309024.0|
| اراد494|      -1276771.0|
| اراد384|      -1210055.0|
+--------+----------------+



### With Spark SQL

In [90]:
print("Most price fall symbol in 6 month:")
spark.sql("SELECT symbol, SUM(close_price_change) as period_change " +
          "FROM stocks " +
          "GROUP BY symbol " +
          "ORDER BY period_change ASC " +
          "LIMIT 10;").show()

Most price fall symbol in 6 month:
+--------+-------------+
|  symbol|period_change|
+--------+-------------+
|   اطلس2|   -7030216.0|
| اراد314|   -1950368.0|
|تسه99102|   -1807954.0|
|تسه99092|   -1771084.0|
|   آگاس2|   -1769006.0|
|تسه99082|   -1754240.0|
| اراد344|   -1422494.0|
| اراد424|   -1309024.0|
| اراد494|   -1276771.0|
| اراد384|   -1210055.0|
+--------+-------------+



## Part 5

### With Dataframe

In [115]:
most_closed = final_df.groupBy('symbol').count().orderBy('count')
print("Most closed symbols:")
most_closed.show()

Most closed symbols:
+---------+-----+
|   symbol|count|
+---------+-----+
|    دروز4|    1|
| تسه99112|    1|
|   اراد58|    1|
| ضسان2011|    1|
|صايپا2032|    1|
|    آسام4|    1|
|  پا ر 18|    1|
| اخزا7182|    1|
|    سدشت2|    1|
|  صگل3092|    1|
|     ملت4|    1|
|  مداران4|    1|
| ضفار1125|    1|
|  تملي808|    1|
| ضغدر3001|    1|
|  اروند09|    1|
|صمعاد4122|    1|
|   اراد37|    1|
|  وتعاون2|    1|
|   افاد55|    1|
+---------+-----+
only showing top 20 rows



### With Spark SQL

In [107]:
print("Most closed symbols:")
spark.sql("SELECT symbol, COUNT(*) as open_days " +
          "FROM stocks " +
          "GROUP BY symbol " +
          "ORDER BY open_days ASC;").show()

Most closed symbols:
+---------+---------+
|   symbol|open_days|
+---------+---------+
|    دروز4|        1|
|    آسام4|        1|
|   اراد58|        1|
|  صگل3092|        1|
| تسه99112|        1|
|صايپا2032|        1|
|  پا ر 18|        1|
| اخزا7182|        1|
|  وتعاون2|        1|
|    سدشت2|        1|
|  مداران4|        1|
|   اراد37|        1|
|   افاد55|        1|
| ضسان2011|        1|
|     ملت4|        1|
|  تملي808|        1|
| ضفار1125|        1|
| ضغدر3001|        1|
|صمعاد4122|        1|
|  اروند09|        1|
+---------+---------+
only showing top 20 rows

