In [1]:
import findspark
findspark.init()

import pyspark
import pandas as pd
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "4g").getOrCreate()

In [3]:
ds = spark.read.format('csv').option('header','true').load('Advertising.csv')

In [None]:
ds = spark.read.csv(path='Advertising.csv',
#                sep=',',
#                encoding='UTF-8',
#                comment=None,
               header=True,
               inferSchema=True)

In [4]:
ds.head(5)

[Row(TV='230.1', Radio='37.8', Newspaper='69.2', Sales='22.1'),
 Row(TV='44.5', Radio='39.3', Newspaper='45.1', Sales='10.4'),
 Row(TV='17.2', Radio='45.9', Newspaper='69.3', Sales='9.3'),
 Row(TV='151.5', Radio='41.3', Newspaper='58.5', Sales='18.5'),
 Row(TV='180.8', Radio='10.8', Newspaper='58.4', Sales='12.9')]

In [5]:
ds.show(5)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows



In [6]:
ds.printSchema()

root
 |-- TV: string (nullable = true)
 |-- Radio: string (nullable = true)
 |-- Newspaper: string (nullable = true)
 |-- Sales: string (nullable = true)



In [7]:
ds.select('TV', 'Radio', 'Newspaper').show(5)

+-----+-----+---------+
|   TV|Radio|Newspaper|
+-----+-----+---------+
|230.1| 37.8|     69.2|
| 44.5| 39.3|     45.1|
| 17.2| 45.9|     69.3|
|151.5| 41.3|     58.5|
|180.8| 10.8|     58.4|
+-----+-----+---------+
only showing top 5 rows



In [8]:
#Fill None
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
ds1 = spark.createDataFrame(my_list, ['A', 'B', 'C'])
ds1 = ds1.fillna(0)

In [9]:
ds1.show()

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|  0|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [10]:
# Replace values
ds1.na.replace(['male','female'],['1','0']).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  1|  0|
|  0|  2|  3|
|  1|  3|  4|
+---+---+---+



In [11]:
# Rename columns   
mapping = {'Newspaper': 'C',
           'Sales': 'D'}

new_names = [mapping.get(col,col) for col in ds1.columns]
new_names

['A', 'B', 'C']

In [12]:
ds1.toDF(*new_names).show(4)

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|  0|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [13]:
ds1.withColumnRenamed('Newspaper', 'Paper').show(4)

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|  0|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [14]:
# Drop Column
drop_name = ['Newspaper', 'Sales']
ds1.drop(*(drop_name)).show(4)

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|  0|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [15]:
# FIltering
ds[ds['Newspaper'] < 20].show(4)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|120.2| 19.6|     11.6| 13.2|
|  8.6|  2.1|        1|  4.8|
|214.7|   24|        4| 17.4|
| 97.5|  7.6|      7.2|  9.7|
+-----+-----+---------+-----+
only showing top 4 rows



In [16]:
# Multiple conditions
ds[(ds['Newspaper'] < 20) & (ds['TV'] > 100)].show(4)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|120.2| 19.6|     11.6| 13.2|
|214.7|   24|        4| 17.4|
|147.3| 23.9|     19.1| 14.6|
|262.9|  3.5|     19.5|   12|
+-----+-----+---------+-----+
only showing top 4 rows



In [17]:
# Create a new column
import pyspark.sql.functions as F 
ds.withColumn('tv_norm', ds['Tv'] / ds.groupBy().agg(F.sum('TV')).collect()[0][0]).show(100)

+-----+-----+---------+-----+--------------------+
|   TV|Radio|Newspaper|Sales|             tv_norm|
+-----+-----+---------+-----+--------------------+
|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
|180.8| 10.8|     58.4| 12.9|0.006147882414948061|
|  8.7| 48.9|       75|  7.2|2.958328374449564E-4|
| 57.5| 32.8|     23.5| 11.8|0.001955217029090...|
|120.2| 19.6|     11.6| 13.2|0.004087253685159...|
|  8.6|  2.1|        1|  4.8|2.924324600030603...|
|199.8|  2.6|     21.2| 10.6| 0.00679395412890831|
| 66.1|  5.8|     24.2|  8.6|0.002247649489093...|
|214.7|   24|        4| 17.4|0.007300610367750821|
| 23.8| 35.1|     65.9|  9.2|8.092898311712601E-4|
| 97.5|  7.6|      7.2|  9.7|0.003315368005848...|
|204.1| 32.9|       46|   19| 0.00694017035890984|
|195.4| 47.7|     52.9| 22.4|0.006644337521464884|
| 67.8| 36.6|      114| 12.5|0.

In [18]:
# New column conditions
ds.withColumn('cond', F.when((ds['TV'] > 100) & (ds['Radio'] < 40), 1).when(ds['Sales'] > 10, 2).otherwise(3)).show(4)

+-----+-----+---------+-----+----+
|   TV|Radio|Newspaper|Sales|cond|
+-----+-----+---------+-----+----+
|230.1| 37.8|     69.2| 22.1|   1|
| 44.5| 39.3|     45.1| 10.4|   3|
| 17.2| 45.9|     69.3|  9.3|   3|
|151.5| 41.3|     58.5| 18.5|   2|
+-----+-----+---------+-----+----+
only showing top 4 rows



In [19]:
ds.withColumn('log_TV', F.log(ds['TV'])).show(4)

+-----+-----+---------+-----+------------------+
|   TV|Radio|Newspaper|Sales|            log_TV|
+-----+-----+---------+-----+------------------+
|230.1| 37.8|     69.2| 22.1|  5.43851399704132|
| 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
| 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
|151.5| 41.3|     58.5| 18.5| 5.020585624949424|
+-----+-----+---------+-----+------------------+
only showing top 4 rows



In [20]:
ds.withColumn('TV+10', ds['TV'] + 10).show(4)

+-----+-----+---------+-----+-----+
|   TV|Radio|Newspaper|Sales|TV+10|
+-----+-----+---------+-----+-----+
|230.1| 37.8|     69.2| 22.1|240.1|
| 44.5| 39.3|     45.1| 10.4| 54.5|
| 17.2| 45.9|     69.3|  9.3| 27.2|
|151.5| 41.3|     58.5| 18.5|161.5|
+-----+-----+---------+-----+-----+
only showing top 4 rows



In [21]:
leftp = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
                    'B': ['B0', 'B1', 'B2', 'B3'],
                    'C': ['C0', 'C1', 'C2', 'C3'],
                    'D': ['D0', 'D1', 'D2', 'D3']},
                    index=[0, 1, 2, 3])
leftp

Unnamed: 0,A,B,C,D
0,A0,B0,C0,D0
1,A1,B1,C1,D1
2,A2,B2,C2,D2
3,A3,B3,C3,D3


In [22]:
rightp = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
                       'F': ['B4', 'B5', 'B6', 'B7'],
                       'G': ['C4', 'C5', 'C6', 'C7'],
                       'H': ['D4', 'D5', 'D6', 'D7']},
                       index=[4, 5, 6, 7])
rightp

Unnamed: 0,A,F,G,H
4,A0,B4,C4,D4
5,A1,B5,C5,D5
6,A6,B6,C6,D6
7,A7,B7,C7,D7


In [23]:
lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)

In [30]:
# Left join
lefts.join(rights, on= 'A', how= 'left').orderBy('A', ascending= True).show()

+---+---+---+---+----+----+----+
|  A|  B|  C|  D|   F|   G|   H|
+---+---+---+---+----+----+----+
| A0| B0| C0| D0|  B4|  C4|  D4|
| A1| B1| C1| D1|  B5|  C5|  D5|
| A2| B2| C2| D2|null|null|null|
| A3| B3| C3| D3|null|null|null|
+---+---+---+---+----+----+----+



In [31]:
# Right join
lefts.join(rights, on='A', how= 'right').orderBy('A', ascending= True).show()

+---+----+----+----+---+---+---+
|  A|   B|   C|   D|  F|  G|  H|
+---+----+----+----+---+---+---+
| A0|  B0|  C0|  D0| B4| C4| D4|
| A1|  B1|  C1|  D1| B5| C5| D5|
| A6|null|null|null| B6| C6| D6|
| A7|null|null|null| B7| C7| D7|
+---+----+----+----+---+---+---+



In [32]:
# Inner Join
lefts.join(rights, on='A', how='inner').orderBy('A', ascending= True).show()

+---+---+---+---+---+---+---+
|  A|  B|  C|  D|  F|  G|  H|
+---+---+---+---+---+---+---+
| A0| B0| C0| D0| B4| C4| D4|
| A1| B1| C1| D1| B5| C5| D5|
+---+---+---+---+---+---+---+



In [33]:
# Full join
lefts.join(rights, on='A', how= 'full').orderBy('A', ascending= True).show()

+---+----+----+----+----+----+----+
|  A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+
| A0|  B0|  C0|  D0|  B4|  C4|  D4|
| A1|  B1|  C1|  D1|  B5|  C5|  D5|
| A2|  B2|  C2|  D2|null|null|null|
| A3|  B3|  C3|  D3|null|null|null|
| A6|null|null|null|  B6|  C6|  D6|
| A7|null|null|null|  B7|  C7|  D7|
+---+----+----+----+----+----+----+



In [35]:
my_list = [('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9),
           ('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9)]
col_name = ['col1', 'col2', 'col3']
ds = spark.createDataFrame(my_list,schema=col_name)
ds.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   2|   3|
|   b|   5|   6|
|   c|   8|   9|
|   a|   2|   3|
|   b|   5|   6|
|   c|   8|   9|
+----+----+----+



In [36]:
ds.withColumn('concat', F.concat('col1', 'col2')).show()

+----+----+----+------+
|col1|col2|col3|concat|
+----+----+----+------+
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
+----+----+----+------+



In [37]:
# groupby
ds.groupBy(['col1']).agg({'col2' : 'min',
                          'col3' : 'avg'}).show()

+----+---------+---------+
|col1|min(col2)|avg(col3)|
+----+---------+---------+
|   a|        2|      3.0|
|   b|        5|      6.0|
|   c|        8|      9.0|
+----+---------+---------+



In [39]:
# pivot
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

+----+----+----+----+
|col1|   2|   5|   8|
+----+----+----+----+
|   c|null|null|  18|
|   b|null|  12|null|
|   a|   6|null|null|
+----+----+----+----+



In [60]:
# window
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
ds = spark.createDataFrame(pd.DataFrame(d))
ds.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  m|  1|
|  b|  m|  2|
|  c|  n|  3|
|  d|  n|  6|
+---+---+---+



In [61]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# normal rank
w = Window.orderBy(ds['C'].desc())
ds = ds.withColumn('rank', F.rank().over(w))
ds.show()

+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  d|  n|  6|   1|
|  c|  n|  3|   2|
|  b|  m|  2|   3|
|  a|  m|  1|   4|
+---+---+---+----+



In [53]:
# rank partitionBy('B')
w = Window.partitionBy('B').orderBy(ds['C'])
ds = ds.withColumn('rank', F.rank().over(w))
ds.show()

+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  a|  m|  1|   1|
|  b|  m|  2|   2|
|  c|  n|  3|   1|
|  d|  n|  6|   2|
+---+---+---+----+



In [62]:
# Rank vs Dense_rank
d ={'Id':[1,2,3,4,5,6],
    'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}

ds = spark.createDataFrame(pd.DataFrame(d))
ds.show()

+---+-----+
| Id|Score|
+---+-----+
|  1|  4.0|
|  2|  4.0|
|  3| 3.85|
|  4| 3.65|
|  5| 3.65|
|  6|  3.5|
+---+-----+



In [63]:
w = Window.orderBy(ds['Score'].desc())
ds = ds.withColumn('Rank_dense', F.dense_rank().over(w))
ds = ds.withColumn('Rank', F.rank().over(w))
ds.show()

+---+-----+----------+----+
| Id|Score|Rank_dense|Rank|
+---+-----+----------+----+
|  1|  4.0|         1|   1|
|  2|  4.0|         1|   1|
|  3| 3.85|         2|   3|
|  4| 3.65|         3|   4|
|  5| 3.65|         3|   4|
|  6|  3.5|         4|   6|
+---+-----+----------+----+



In [64]:
stock_data = spark.read.csv("Stock_data.csv", header= True )
stock_data.show(5)

+--------+-----+-----+
|    Date|Stock|Price|
+--------+-----+-----+
|8/1/2022|  VNM|73500|
|8/2/2022|  VNM|73400|
|8/3/2022|  VNM|73400|
|8/4/2022|  VNM|73300|
|8/5/2022|  VNM|72600|
+--------+-----+-----+
only showing top 5 rows



In [65]:
stock_data.select('Date').distinct().show()

+---------+
|     Date|
+---------+
| 8/9/2022|
| 8/2/2022|
| 8/4/2022|
|8/14/2022|
| 8/5/2022|
|8/12/2022|
|8/11/2022|
|8/10/2022|
| 8/3/2022|
|8/13/2022|
| 8/7/2022|
| 8/6/2022|
| 8/8/2022|
| 8/1/2022|
+---------+



In [67]:
stock_data.select('Date').orderBy('Date').distinct().show()

+---------+
|     Date|
+---------+
| 8/9/2022|
| 8/2/2022|
| 8/4/2022|
|8/14/2022|
| 8/5/2022|
|8/12/2022|
|8/11/2022|
|8/10/2022|
| 8/3/2022|
|8/13/2022|
| 8/7/2022|
| 8/6/2022|
| 8/8/2022|
| 8/1/2022|
+---------+



In [68]:
stock_data.select('Stock').distinct().show()

+-----+
|Stock|
+-----+
|  ROS|
|  VCB|
|  VNM|
+-----+



In [74]:
ROS = stock_data.filter(stock_data.Stock == 'ROS')
VCB = stock_data.filter(stock_data.Stock == 'VCB')
VNM = stock_data.filter(stock_data.Stock == 'VNM')

ROS = ROS.select('Date','Price')
VCB = VCB.select('Date','Price')
VNM = VNM.select('Date','Price')

ROS = ROS.withColumnRenamed('Price','ROS_Price')
VCB = VCB.withColumnRenamed('Price','VCB_Price')
VNM = VNM.withColumnRenamed('Price','VNM_Price')

output = ROS.join(VCB, on= 'Date', how= 'left').join(VNM, on= 'Date', how= 'left')
output.show()

+---------+---------+---------+---------+
|     Date|ROS_Price|VCB_Price|VNM_Price|
+---------+---------+---------+---------+
| 8/1/2022|     2970|    77000|    73500|
| 8/2/2022|     3170|    78900|    73400|
| 8/3/2022|     2950|    79500|    73400|
| 8/4/2022|     2850|    82500|    73300|
| 8/5/2022|     2800|    82000|    72600|
| 8/6/2022|     2705|    82000|    72400|
| 8/7/2022|     2705|    82000|    72400|
| 8/8/2022|     2610|    82000|    72200|
| 8/9/2022|     2480|    80800|    72000|
|8/10/2022|     2600|    80500|    72500|
|8/11/2022|     2510|    81500|    71100|
|8/12/2022|     2510|    81600|    71900|
|8/13/2022|     2510|    81800|    71500|
|8/14/2022|     2510|    81500|    71800|
+---------+---------+---------+---------+



In [76]:
stock_data = stock_data.withColumn('Price',stock_data['Price'].cast('int'))
stock_data.groupBy('Date').pivot('Stock').sum('Price').show()

+---------+----+-----+-----+
|     Date| ROS|  VCB|  VNM|
+---------+----+-----+-----+
| 8/9/2022|2480|80800|72000|
| 8/4/2022|2850|82500|73300|
| 8/2/2022|3170|78900|73400|
|8/14/2022|2510|81500|71800|
| 8/5/2022|2800|82000|72600|
|8/12/2022|2510|81600|71900|
|8/11/2022|2510|81500|71100|
|8/10/2022|2600|80500|72500|
| 8/3/2022|2950|79500|73400|
|8/13/2022|2510|81800|71500|
| 8/7/2022|2705|82000|72400|
| 8/6/2022|2705|82000|72400|
| 8/8/2022|2610|82000|72200|
| 8/1/2022|2970|77000|73500|
+---------+----+-----+-----+



In [27]:
stock_data.registerTempTable('stock_data')



In [28]:
spark.sql('select * , lag(Price, 1) over (partition by Stock order by Date) as yesterday from stock_data').show(10)

+---------+-----+-----+---------+
|     Date|Stock|Price|yesterday|
+---------+-----+-----+---------+
| 8/1/2022|  ROS| 2970|     null|
|8/10/2022|  ROS| 2600|     2970|
|8/11/2022|  ROS| 2510|     2600|
|8/12/2022|  ROS| 2510|     2510|
|8/13/2022|  ROS| 2510|     2510|
|8/14/2022|  ROS| 2510|     2510|
| 8/2/2022|  ROS| 3170|     2510|
| 8/3/2022|  ROS| 2950|     3170|
| 8/4/2022|  ROS| 2850|     2950|
| 8/5/2022|  ROS| 2800|     2850|
+---------+-----+-----+---------+
only showing top 10 rows

