<a href="https://colab.research.google.com/github/TranThu1208/my-ciu-tui-voi/blob/master/0_Working_with_Spark_Fundamentals.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --upgrade pyspark


Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=a61e27621627318a2d75c52c5f7689667b303667c3d8d376c54e2aa9d30d4bc6
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
!pip install pyspark



# Reading Data from a File

# Subset Columns and View a Glimpse of the Data

In [6]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PySpark Basics").getOrCreate()

# Define file location and options
file_location = "finance-charts-apple1.csv"
file_type = "csv"
infer_schema = "False"
first_row_is_header = "True"

# Read the CSV file into a DataFrame
df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .load(file_location)

# Print the schema of the DataFrame
df.printSchema()

# Print the data types of columns
print(df.dtypes)

# Count the number of records in the DataFrame
record_count = df.count()

# Print the total number of records
print('The total number of records in the finance-charts-apple dataset is ' + str(record_count))


root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adjusted: string (nullable = true)
 |-- dn: string (nullable = true)
 |-- mavg: string (nullable = true)
 |-- up: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- Headlines: string (nullable = true)

[('Date', 'string'), ('Open', 'string'), ('High', 'string'), ('Low', 'string'), ('Close', 'string'), ('Volume', 'string'), ('Adjusted', 'string'), ('dn', 'string'), ('mavg', 'string'), ('up', 'string'), ('direction', 'string'), ('Headlines', 'string')]
The total number of records in the finance-charts-apple dataset is 506


In [7]:
full_df = df
# Defining a list to subset the required columns
columns=["Date","Open","Close","Volume","mavg","direction"]
# Subsetting the required columns from the DataFrame
df = df.select(columns)
# The following command shows the data; by default it shows top 20 rows
df.show(100)

+----------+----------+----------+---------+-----------+----------+
|      Date|      Open|     Close|   Volume|       mavg| direction|
+----------+----------+----------+---------+-----------+----------+
|2015-02-17|127.489998|127.830002| 63152400|117.9276669|Increasing|
|2015-02-18|127.629997|128.720001| 44891700|118.9403335|Increasing|
|2015-02-19|128.479996|128.449997| 37362400|119.8891668|Decreasing|
|2015-02-20|128.619995|     129.5| 48948400|120.7635001|Increasing|
|2015-02-23|130.020004|       133| 70974100|121.7201668|Increasing|
|2015-02-24|132.940002|132.169998| 69228100|122.6648335|Decreasing|
|2015-02-25|131.559998|128.789993| 74711700|123.6296667|Decreasing|
|2015-02-26|128.789993|130.419998| 91287500|124.2823333|Increasing|
|2015-02-27|       130|128.460007| 62014800|       NULL|Decreasing|
|2015-03-02|    129.25|129.089996| 48096700|125.4036668|Decreasing|
|2015-03-03|128.960007|129.360001| 37816300|125.9551669|Increasing|
|2015-03-04|129.100006|128.539993| 31666300|126.

In [None]:
from pyspark.sql.functions import round
#You also have the option of selecting the columns by index
#instead of selecting the names from the original DataFrame:
#df.select(df[2],df[1],df[6],'direction').show(5,False) #list index out of range
full_df.select(full_df[2],round(full_df[1]),full_df[7],'direction').show(10,False)

+----------+--------------+-----------+----------+
|High      |round(Open, 0)|dn         |direction |
+----------+--------------+-----------+----------+
|128.880005|127.0         |106.7410523|Increasing|
|128.779999|128.0         |107.842423 |Increasing|
|129.029999|128.0         |108.8942449|Decreasing|
|129.5     |129.0         |109.7854494|Increasing|
|133       |130.0         |110.3725162|Increasing|
|133.600006|133.0         |111.0948689|Decreasing|
|131.600006|132.0         |113.2119183|Decreasing|
|130.869995|129.0         |114.1652991|Increasing|
|130.570007|130.0         |114.9668484|Decreasing|
|130.279999|129.0         |115.8770904|Decreasing|
+----------+--------------+-----------+----------+
only showing top 10 rows



# Missing Values


In [8]:
#Calculate the missing values in a single column or in multiple columns by using
#the built-in functions in PySpark, as follows:
from pyspark.sql.functions import *
full_df.filter((full_df['mavg']=='')|full_df['mavg']\
.isNull()|isnan(full_df['mavg'])).count()

18

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Calculate all the missing values in the DataFrame,
full_df.select([count(when((col(c)=='') | col(c).isNull() |isnan(c), c)).
alias(c) for c in df.columns]).show()

+----+----+-----+------+----+---------+
|Date|Open|Close|Volume|mavg|direction|
+----+----+-----+------+----+---------+
|   0|   0|    0|     0|  18|        0|
+----+----+-----+------+----+---------+



# One-Way Frequencies


In [None]:
full_df.groupBy(df['direction']).count().show()

+----------+-----+
| direction|count|
+----------+-----+
|Increasing|  278|
|Decreasing|  228|
+----------+-----+



# Sorting and Filtering One-Way Frequencies

In [None]:
# Subsetting and creating a temporary DataFrame to eliminate any missing values
df_temp=df.filter((df['mavg']!='')&(df['mavg'].isNotNull()) &
(~isnan(df['mavg'])))
df_temp.show()

+----------+----------+----------+--------+-----------+----------+
|      Date|      Open|     Close|  Volume|       mavg| direction|
+----------+----------+----------+--------+-----------+----------+
|2015-02-17|127.489998|127.830002|63152400|117.9276669|Increasing|
|2015-02-18|127.629997|128.720001|44891700|118.9403335|Increasing|
|2015-02-19|128.479996|128.449997|37362400|119.8891668|Decreasing|
|2015-02-20|128.619995|     129.5|48948400|120.7635001|Increasing|
|2015-02-23|130.020004|       133|70974100|121.7201668|Increasing|
|2015-02-24|132.940002|132.169998|69228100|122.6648335|Decreasing|
|2015-02-25|131.559998|128.789993|74711700|123.6296667|Decreasing|
|2015-02-26|128.789993|130.419998|91287500|124.2823333|Increasing|
|2015-03-02|    129.25|129.089996|48096700|125.4036668|Decreasing|
|2015-03-03|128.960007|129.360001|37816300|125.9551669|Increasing|
|2015-03-04|129.100006|128.539993|31666300|126.4730002|Decreasing|
|2015-03-06|128.399994|126.599998|72842100|127.2288335|Decreas

In [None]:
# Subsetting the DataFrame to Open that are repeated more than 20 times
columns=["Date","Open","Close","Volume","mavg","direction"]
df_temp = df.select(columns)
df_temp=df_temp.withColumn("Rounded_Open", round(col("Open"), 0))
df_temp.show()
df_temp.groupby(df_temp['Rounded_Open'])\
.count().filter("`count` >20")\
.sort(col("count").desc()).show(10,False)

+----------+----------+----------+--------+-----------+----------+------------+
|      Date|      Open|     Close|  Volume|       mavg| direction|Rounded_Open|
+----------+----------+----------+--------+-----------+----------+------------+
|2015-02-17|127.489998|127.830002|63152400|117.9276669|Increasing|       127.0|
|2015-02-18|127.629997|128.720001|44891700|118.9403335|Increasing|       128.0|
|2015-02-19|128.479996|128.449997|37362400|119.8891668|Decreasing|       128.0|
|2015-02-20|128.619995|     129.5|48948400|120.7635001|Increasing|       129.0|
|2015-02-23|130.020004|       133|70974100|121.7201668|Increasing|       130.0|
|2015-02-24|132.940002|132.169998|69228100|122.6648335|Decreasing|       133.0|
|2015-02-25|131.559998|128.789993|74711700|123.6296667|Decreasing|       132.0|
|2015-02-26|128.789993|130.419998|91287500|124.2823333|Increasing|       129.0|
|2015-02-27|       130|128.460007|62014800|       NULL|Decreasing|       130.0|
|2015-03-02|    129.25|129.089996|480967

# Casting Variables

In [None]:
#Casting
df = df.withColumn('Volume',df['Volume'].cast("float"))
#After Casting
df.dtypes
df.show()

+----------+----------+----------+-----------+-----------+----------+
|      Date|      Open|     Close|     Volume|       mavg| direction|
+----------+----------+----------+-----------+-----------+----------+
|2015-02-17|127.489998|127.830002|  6.31524E7|117.9276669|Increasing|
|2015-02-18|127.629997|128.720001|  4.48917E7|118.9403335|Increasing|
|2015-02-19|128.479996|128.449997|  3.73624E7|119.8891668|Decreasing|
|2015-02-20|128.619995|     129.5|  4.89484E7|120.7635001|Increasing|
|2015-02-23|130.020004|       133|7.0974096E7|121.7201668|Increasing|
|2015-02-24|132.940002|132.169998|6.9228096E7|122.6648335|Decreasing|
|2015-02-25|131.559998|128.789993|7.4711696E7|123.6296667|Decreasing|
|2015-02-26|128.789993|130.419998|9.1287504E7|124.2823333|Increasing|
|2015-02-27|       130|128.460007|  6.20148E7|       NULL|Decreasing|
|2015-03-02|    129.25|129.089996|  4.80967E7|125.4036668|Decreasing|
|2015-03-03|128.960007|129.360001|  3.78163E7|125.9551669|Increasing|
|2015-03-04|129.1000

In [None]:
#Importing necessary libraries
from pyspark.sql.types import *
#Identifying and assigning lists of variables
float_vars=['Open', 'Close', 'Volume','mavg']
date_vars=['Date']
#Converting variables
for column in float_vars:
 df=df.withColumn(column,df[column].cast(FloatType()))
for column in date_vars:
 df=df.withColumn(column,df[column].cast(DateType()))

df.dtypes


[('Date', 'date'),
 ('Open', 'float'),
 ('Close', 'float'),
 ('Volume', 'float'),
 ('mavg', 'float'),
 ('direction', 'string')]

In [None]:
df.show(10,False)

+----------+------+------+-----------+----------+----------+
|Date      |Open  |Close |Volume     |mavg      |direction |
+----------+------+------+-----------+----------+----------+
|2015-02-17|127.49|127.83|6.31524E7  |117.927666|Increasing|
|2015-02-18|127.63|128.72|4.48917E7  |118.94033 |Increasing|
|2015-02-19|128.48|128.45|3.73624E7  |119.88917 |Decreasing|
|2015-02-20|128.62|129.5 |4.89484E7  |120.7635  |Increasing|
|2015-02-23|130.02|133.0 |7.0974096E7|121.72017 |Increasing|
|2015-02-24|132.94|132.17|6.9228096E7|122.66483 |Decreasing|
|2015-02-25|131.56|128.79|7.4711696E7|123.62967 |Decreasing|
|2015-02-26|128.79|130.42|9.1287504E7|124.28233 |Increasing|
|2015-02-27|130.0 |128.46|6.20148E7  |NULL      |Decreasing|
|2015-03-02|129.25|129.09|4.80967E7  |125.40366 |Decreasing|
+----------+------+------+-----------+----------+----------+
only showing top 10 rows



# Descriptive Statistics

In [None]:
columns=["Open","Close","Volume"]
# Subsetting the required columns from the DataFrame
df = df.select(columns)
df.describe().show()

+-------+------------------+------------------+--------------------+
|summary|              Open|             Close|              Volume|
+-------+------------------+------------------+--------------------+
|  count|               506|               506|                 506|
|   mean|112.93499998612837|112.95833972131781| 4.317842094071146E7|
| stddev|11.287489727914043|11.244743807947772|1.9852531320971377E7|
|    min|              90.0|             90.34|           1.14759E7|
|    max|            135.67|            135.51|        1.62206304E8|
+-------+------------------+------------------+--------------------+



In [None]:
#Since unknown values in Open are marked to be 0, let’s filter out those
#values before calculating the median
df_temp = df.filter((df['Open']!=0)&(df['Open'].isNotNull()) &
(~isnan(df['Open'])))
#Here the second parameter indicates the median value, which is 0.5; you
#can also try adjusting the value to calculate other percentiles
median=df.approxQuantile('Open',[0.5],0.1)
#Printing the Value
print ('The median of Open is '+str(median))

The median of Open is [110.19000244140625]


# Unique/Distinct Values and Counts

In [None]:
#You may sometimes just want to know the number of levels (cardinality) within
#a variable. You can do this using the countDistinct function available in Spark
# Counts the distinct occurances of titles
full_df.agg(countDistinct(col("Direction")).alias("Count")).show()

+-----+
|Count|
+-----+
|    2|
+-----+



In [None]:
# Counts the distinct occurances of Date
full_df.select('Date').distinct().show(10,False)

+----------+
|Date      |
+----------+
|2015-05-01|
|2016-08-17|
|2015-02-27|
|2016-04-22|
|2016-08-08|
|2015-02-26|
|2015-11-20|
|2016-07-06|
|2016-03-17|
|2016-04-15|
+----------+
only showing top 10 rows



In [None]:
# Extracting year from the date

columns=["Date","Open","Close","Volume","direction","headlines"]
df_temp=full_df.select(columns)

df_temp=df_temp.withColumn('Year',year('Date'))
df_temp.show(5)
# Extracting month
df_temp=df_temp.withColumn('Month',month('Date'))
df_temp.show(5)
# Extracting day of month
df_temp=df_temp.withColumn('Day_of_Month',dayofmonth('Date'))
df_temp.show(5)
# Calculating the distinct counts by the year
df_temp.groupBy("Year").agg(countDistinct("Direction")).show(5,False)



+----------+----------+----------+--------+----------+--------------------+----+
|      Date|      Open|     Close|  Volume| direction|           headlines|Year|
+----------+----------+----------+--------+----------+--------------------+----+
|2015-02-17|127.489998|127.830002|63152400|Increasing|A group of journa...|2015|
|2015-02-18|127.629997|128.720001|44891700|Increasing|Playboy Enterpris...|2015|
|2015-02-19|128.479996|128.449997|37362400|Decreasing|Hardware and soft...|2015|
|2015-02-20|128.619995|     129.5|48948400|Increasing|GMO Internet, a l...|2015|
|2015-02-23|130.020004|       133|70974100|Increasing|A Tokyo-based rea...|2015|
+----------+----------+----------+--------+----------+--------------------+----+
only showing top 5 rows

+----------+----------+----------+--------+----------+--------------------+----+-----+
|      Date|      Open|     Close|  Volume| direction|           headlines|Year|Month|
+----------+----------+----------+--------+----------+------------------

# Filtering

In [None]:
#Filter all the Directions that start with “Inc"
df_temp.filter(df_temp['Headlines'].like('Playboy%')).show(3,False)

+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|Date      |Open      |Close     |Volume  |direction |headlines                                                                       |Year|Month|Day_of_Month|
+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|2015-02-18|127.629997|128.720001|44891700|Increasing|Playboy Enterprises, the parent company of Playboy Magazine, is suing cryptocurr|2015|2    |18          |
|2015-03-05|128.580002|126.410004|56517100|Decreasing|Playboy Enterprises, Inc. has announced it is launching its own cryptocurrency a|2015|3    |5           |
+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+



In [None]:
#Now, let’s find out the dates that do not end with an “17”
df_temp.filter(~df_temp['Date'].like('%17')).show(3,False)

+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|Date      |Open      |Close     |Volume  |direction |headlines                                                                       |Year|Month|Day_of_Month|
+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|2015-02-18|127.629997|128.720001|44891700|Increasing|Playboy Enterprises, the parent company of Playboy Magazine, is suing cryptocurr|2015|2    |18          |
|2015-02-19|128.479996|128.449997|37362400|Decreasing|Hardware and software giant Microsoft reportedly brought back bitcoin payments a|2015|2    |19          |
|2015-02-20|128.619995|129.5     |48948400|Increasing|GMO Internet, a leading Japanese Internet services provider that offers online s|2015|2    |20          |
+----------+----------+----------+------

In [None]:
#find any title that contains “Internet” we could use the rlike function,
#which is a regular expression
df_temp.filter(df_temp['headlines'].rlike('\w*Internet')).show(10,False)

+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|Date      |Open      |Close     |Volume  |direction |headlines                                                                       |Year|Month|Day_of_Month|
+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+
|2015-02-20|128.619995|129.5     |48948400|Increasing|GMO Internet, a leading Japanese Internet services provider that offers online s|2015|2    |20          |
|2016-08-03|104.809998|105.790001|30202600|Increasing|VeChain, a China-based Internet of Things (IoT)-focused crypto, has surged by mo|2016|8    |3           |
+----------+----------+----------+--------+----------+--------------------------------------------------------------------------------+----+-----+------------+



In [None]:
#The preceding expression can also be rewritten as follows:
df_temp.filter(df_temp.headlines.contains('Internet')).show()

+----------+----------+----------+--------+----------+--------------------+----+-----+------------+
|      Date|      Open|     Close|  Volume| direction|           headlines|Year|Month|Day_of_Month|
+----------+----------+----------+--------+----------+--------------------+----+-----+------------+
|2015-02-20|128.619995|     129.5|48948400|Increasing|GMO Internet, a l...|2015|    2|          20|
|2016-08-03|104.809998|105.790001|30202600|Increasing|VeChain, a China-...|2016|    8|           3|
+----------+----------+----------+--------+----------+--------------------+----+-----+------------+



In [None]:
#identify variables that end with a particular suffix? Here we
#have three variables that end with “e”; let’s see if we can identify them by tweaking our
#earlier regular expressions
df.select(df.colRegex("`\w*e`")).printSchema()

root
 |-- Close: float (nullable = true)
 |-- Volume: float (nullable = true)



In [None]:
#So, let’s first calculate the max by using the following command. The agg function
#used here is handy instead of using describe when you are looking for a specific statistic:
max_pop=df.agg({'Close': 'max'}).collect()[0]['max(Close)']
print(max_pop)
count_obs= df.count()
print(count_obs)
mean_pop=df.agg({'Close': 'mean'}).collect()[0]['avg(Close)']
print(mean_pop)

135.50999450683594
506
112.95833972131781


In [None]:
#The lit function is a way
#to interact with column literals. It is very useful when you want
#to create a column with a value directly.
df=df.withColumn('mean_Close',lit(mean_pop))
df=df.withColumn('varaiance',pow((df['Close']-df['mean_Close']),2))
variance_sum=df.agg({'varaiance': 'sum'}).collect()[0]['sum(varaiance)']
print(variance_sum)
variance_population= variance_sum/(count_obs-1)
print(variance_population)


63854.35296972171
126.44426330637963


# Creating new column

In [None]:
def new_cols(Volume,Open):
 if Volume<48948400: Volume_cat='Small'
 elif Volume<58948400: Volume_cat='Medium'
 else: Volume_cat='Big'
 if Open<3: Open_cat='Low'
 elif Open<5: Open_cat='Mid'
 else: Open_cat='High'
 return Volume_cat,Open_cat
# Apply the user-defined function on the DataFrame
udfB=udf(new_cols,StructType([StructField("Volume_cat", StringType(),
True),StructField("Open_cat", StringType(), True)]))
# Pass a user-defined function with two input columns Open and Close
df_temp=df.select('Open','Close','Volume').withColumn("newcat",udfB("Volume","Open"))
df_temp.show(10)


[Stage 60:>                                                         (0 + 1) / 1]

+------+------+-----------+--------------+
|  Open| Close|     Volume|        newcat|
+------+------+-----------+--------------+
|127.49|127.83|  6.31524E7|   {Big, High}|
|127.63|128.72|  4.48917E7| {Small, High}|
|128.48|128.45|  3.73624E7| {Small, High}|
|128.62| 129.5|  4.89484E7|{Medium, High}|
|130.02| 133.0|7.0974096E7|   {Big, High}|
|132.94|132.17|6.9228096E7|   {Big, High}|
|131.56|128.79|7.4711696E7|   {Big, High}|
|128.79|130.42|9.1287504E7|   {Big, High}|
| 130.0|128.46|  6.20148E7|   {Big, High}|
|129.25|129.09|  4.80967E7| {Small, High}|
+------+------+-----------+--------------+
only showing top 10 rows



                                                                                

In [None]:
# Unbundle the struct type columns into individual columns and drop the struct type
df_with_newcols = df_temp.select('Open','Close','Volume','newcat')\
.withColumn('Volume_cat', df_temp.newcat\
.getItem('Volume_cat'))\
.withColumn('Open_cat', df_temp.newcat\
.getItem('Open_cat')).drop('newcat')
df_with_newcols.show(10,False)

+------+------+-----------+----------+--------+
|Open  |Close |Volume     |Volume_cat|Open_cat|
+------+------+-----------+----------+--------+
|127.49|127.83|6.31524E7  |Big       |High    |
|127.63|128.72|4.48917E7  |Small     |High    |
|128.48|128.45|3.73624E7  |Small     |High    |
|128.62|129.5 |4.89484E7  |Medium    |High    |
|130.02|133.0 |7.0974096E7|Big       |High    |
|132.94|132.17|6.9228096E7|Big       |High    |
|131.56|128.79|7.4711696E7|Big       |High    |
|128.79|130.42|9.1287504E7|Big       |High    |
|130.0 |128.46|6.20148E7  |Big       |High    |
|129.25|129.09|4.80967E7  |Small     |High    |
+------+------+-----------+----------+--------+
only showing top 10 rows



Another way we can achieve the same result is through the when function. One
advantage of using this function is you don’t have to define the output data type. This
is handy for quick and dirty operations. Let’s recreate the preceding columns using the
when function.

In [None]:
# def new_cols(Volume,Open):
#  if Volume<48948400: Volume_cat='Small'
#  elif Volume<58948400: Volume_cat='Medium'
#  else: Volume_cat='Big'
#  if Open<3: Open_cat='Low'
#  elif Open<5: Open_cat='Mid'
#  else: Open_cat='High'
#  return Volume_cat,Open_cat
# In order to comment on multiple lines at once in Jupyter Notebook,
# you have to select the required lines and then press the Ctrl + /
df_with_newcols = df_temp.select('Open','Close','Volume').\
withColumn('Volume_cat', when(df_temp['Volume']<48948400,'Small').\
when(df_temp['Volume']<58948400,'Medium').otherwise('Big')).\
withColumn('Open_cat', when(df_temp['Open']<120,'Low').
when(df_temp['Open']<135,'Mid').otherwise('High'))
df_with_newcols.show(10)

+------+------+-----------+----------+--------+
|  Open| Close|     Volume|Volume_cat|Open_cat|
+------+------+-----------+----------+--------+
|127.49|127.83|  6.31524E7|       Big|     Mid|
|127.63|128.72|  4.48917E7|     Small|     Mid|
|128.48|128.45|  3.73624E7|     Small|     Mid|
|128.62| 129.5|  4.89484E7|    Medium|     Mid|
|130.02| 133.0|7.0974096E7|       Big|     Mid|
|132.94|132.17|6.9228096E7|       Big|     Mid|
|131.56|128.79|7.4711696E7|       Big|     Mid|
|128.79|130.42|9.1287504E7|       Big|     Mid|
| 130.0|128.46|  6.20148E7|       Big|     Mid|
|129.25|129.09|  4.80967E7|     Small|     Mid|
+------+------+-----------+----------+--------+
only showing top 10 rows



# Deleting and Renaming Columns

In [None]:
# Using the drop function. to drop any column or columns
columns_to_drop=['Close']
df_with_newcols=df_with_newcols.drop(*columns_to_drop)
df_with_newcols.show(10)

+------+-----------+----------+--------+
|  Open|     Volume|Volume_cat|Open_cat|
+------+-----------+----------+--------+
|127.49|  6.31524E7|       Big|     Mid|
|127.63|  4.48917E7|     Small|     Mid|
|128.48|  3.73624E7|     Small|     Mid|
|128.62|  4.89484E7|    Medium|     Mid|
|130.02|7.0974096E7|       Big|     Mid|
|132.94|6.9228096E7|       Big|     Mid|
|131.56|7.4711696E7|       Big|     Mid|
|128.79|9.1287504E7|       Big|     Mid|
| 130.0|  6.20148E7|       Big|     Mid|
|129.25|  4.80967E7|     Small|     Mid|
+------+-----------+----------+--------+
only showing top 10 rows



In [None]:
# Renaming can be done using
# either the withColumnRenamed function or the alias function.
df_with_newcols = df_with_newcols\
.withColumnRenamed('Open','Renamed_Open')\
.withColumnRenamed('Volume','Renamed_Volume')
df_with_newcols.show(10)


+------------+--------------+----------+--------+
|Renamed_Open|Renamed_Volume|Volume_cat|Open_cat|
+------------+--------------+----------+--------+
|      127.49|     6.31524E7|       Big|     Mid|
|      127.63|     4.48917E7|     Small|     Mid|
|      128.48|     3.73624E7|     Small|     Mid|
|      128.62|     4.89484E7|    Medium|     Mid|
|      130.02|   7.0974096E7|       Big|     Mid|
|      132.94|   6.9228096E7|       Big|     Mid|
|      131.56|   7.4711696E7|       Big|     Mid|
|      128.79|   9.1287504E7|       Big|     Mid|
|       130.0|     6.20148E7|       Big|     Mid|
|      129.25|     4.80967E7|     Small|     Mid|
+------------+--------------+----------+--------+
only showing top 10 rows



In [None]:
# To change multiple column names,try the following command:
# Define all the variable changes in the list
new_names = [('Volume_cat','Volume_cat1'),('Open_cat','Open_cat1')]
# Applying the alias function
df_with_newcols_renamed = df_with_newcols\
.select(list(map(lambda old,new:col(old).alias(new),*zip(*new_names))))
df_with_newcols_renamed.show(10)

+-----------+---------+
|Volume_cat1|Open_cat1|
+-----------+---------+
|        Big|      Mid|
|      Small|      Mid|
|      Small|      Mid|
|     Medium|      Mid|
|        Big|      Mid|
|        Big|      Mid|
|        Big|      Mid|
|        Big|      Mid|
|        Big|      Mid|
|      Small|      Mid|
+-----------+---------+
only showing top 10 rows



# 5.2 Utility Functions and Visualizations

In [None]:
# To concatenate the values of budget_cat and ratings together into a
# single column, we can do so using the concat function. On top of this, let’s change the
# case of the new column to lowercase and trim away any white spaces using the lower
# and trim functions
# Concatenating two variables
df_with_newcols=df_with_newcols.\
withColumn('Open_Volume_Cat',concat(df_with_newcols.\
Volume_cat,df_with_newcols.Open_cat))
df_with_newcols.show(10)

+------------+--------------+----------+--------+---------------+
|Renamed_Open|Renamed_Volume|Volume_cat|Open_cat|Open_Volume_Cat|
+------------+--------------+----------+--------+---------------+
|      127.49|     6.31524E7|       Big|     Mid|         BigMid|
|      127.63|     4.48917E7|     Small|     Mid|       SmallMid|
|      128.48|     3.73624E7|     Small|     Mid|       SmallMid|
|      128.62|     4.89484E7|    Medium|     Mid|      MediumMid|
|      130.02|   7.0974096E7|       Big|     Mid|         BigMid|
|      132.94|   6.9228096E7|       Big|     Mid|         BigMid|
|      131.56|   7.4711696E7|       Big|     Mid|         BigMid|
|      128.79|   9.1287504E7|       Big|     Mid|         BigMid|
|       130.0|     6.20148E7|       Big|     Mid|         BigMid|
|      129.25|     4.80967E7|     Small|     Mid|       SmallMid|
+------------+--------------+----------+--------+---------------+
only showing top 10 rows



# Registering DataFrames

In [None]:
# Registering temporary table
df_with_newcols.registerTempTable('temp_data')
# Applying the function to show the results
spark.sql('select Volume_cat, count(Volume_cat)\
from temp_data group by Volume_cat').show(10)



+----------+-----------------+
|Volume_cat|count(Volume_cat)|
+----------+-----------------+
|    Medium|               62|
|     Small|              364|
|       Big|               80|
+----------+-----------------+



# Window Functions

In [None]:
# Importing the window functions
from pyspark.sql.window import *
# Step 1: Filtering the missing values
df_with_newcols=df_with_newcols.filter( (df_with_newcols['Renamed_Open'].
isNotNull()) & (~isnan(df_with_newcols['Renamed_Open'])) )

In [None]:
# Step 2: Applying the window functions for calculating deciles
df_with_newcols = df_with_newcols.select("Renamed_Open","Renamed_Volume","Volume_cat",
ntile(10).over(Window.partitionBy().orderBy(df_with_newcols['Renamed_Volume'].
desc())).alias("decile_rank"))
df_with_newcols.show(10)

23/12/19 15:43:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+--------------+----------+-----------+
|Renamed_Open|Renamed_Volume|Volume_cat|decile_rank|
+------------+--------------+----------+-----------+
|       94.87|  1.62206304E8|       Big|          1|
|       96.04|  1.33369696E8|       Big|          1|
|      110.43|  1.28275504E8|       Big|          1|
|      117.42|    1.241386E8|       Big|          1|
|      134.46|     1.18924E8|       Big|          1|
|      121.99|    1.154506E8|       Big|          1|
|        96.0|  1.14602096E8|       Big|          1|
|      127.03|     1.11985E8|       Big|          1|
|      108.73|  1.10888704E8|       Big|          1|
|      111.11|    1.036016E8|       Big|          1|
+------------+--------------+----------+-----------+
only showing top 10 rows



In [None]:
# Step 3:Dispalying the values
df_with_newcols.groupby("decile_rank")\
.agg(min('Renamed_Volume').alias('min_Volume'),max('Renamed_Volume')\
.alias('max_Volume'),count('Renamed_Volume')).show()

23/12/19 15:43:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/19 15:43:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------+----------+------------+---------------------+
|decile_rank|min_Volume|  max_Volume|count(Renamed_Volume)|
+-----------+----------+------------+---------------------+
|          1| 6.85016E7|1.62206304E8|                   51|
|          2| 5.48436E7|   6.84574E7|                   51|
|          3| 4.74457E7|   5.47169E7|                   51|
|          4|  4.2885E7|   4.70997E7|                   51|
|          5| 3.73572E7|   4.28032E7|                   51|
|          6| 3.41439E7|   3.73562E7|                   51|
|          7| 3.16663E7|   3.40393E7|                   50|
|          8| 2.77663E7|   3.15619E7|                   50|
|          9| 2.51396E7|   2.77337E7|                   50|
|         10| 1.14759E7|   2.50862E7|                   50|
+-----------+----------+------------+---------------------+



# Other Useful Functions

In [None]:
full_df.show(200)
columns=["Date","Open","Close","Volume","direction","headlines"]
df_temp=full_df.select(columns)
df_temp=df_temp.withColumn('Year',year('Date'))
df_temp.show(5)

+----------+----------+----------+----------+----------+---------+----------+-----------+-----------+-----------+----------+--------------------+
|      Date|      Open|      High|       Low|     Close|   Volume|  Adjusted|         dn|       mavg|         up| direction|           Headlines|
+----------+----------+----------+----------+----------+---------+----------+-----------+-----------+-----------+----------+--------------------+
|2015-02-17|127.489998|128.880005|126.919998|127.830002| 63152400|122.905254|106.7410523|117.9276669|129.1142814|Increasing|A group of journa...|
|2015-02-18|127.629997|128.779999|127.449997|128.720001| 44891700|123.760965| 107.842423|118.9403335|130.0382439|Increasing|Playboy Enterpris...|
|2015-02-19|128.479996|129.029999|128.330002|128.449997| 37362400|123.501363|108.8942449|119.8891668|130.8840887|Decreasing|Hardware and soft...|
|2015-02-20|128.619995|     129.5|128.050003|     129.5| 48948400|124.510914|109.7854494|120.7635001|131.7415509|Increasing|

In [None]:
# Apply collect_list function to gather all occurrences
df_temp.filter(df_temp['Headlines'].like('%Crypto%')).groupby('direction')\
.agg(collect_list("Year")).show(1,False)


+----------+------------------------------------------------------------------------------------------------------------------------------------+
|direction |collect_list(Year)                                                                                                                  |
+----------+------------------------------------------------------------------------------------------------------------------------------------+
|Increasing|[2015, 2015, 2015, 2015, 2015, 2015, 2015, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016]|
+----------+------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



# Sampling

The first
parameter True/False indicates whether you would like to do a sample with or without
replacement. Here, we would like to do it without replacement, so we selected False.
The second parameter is the fraction. It indicates the proportion of the population you
would like to have in the sample. The third parameter is the seed, which guarantees you
the same result when you run this snippet every single time.

In [None]:
# Simple random sampling in PySpark with replacement
df_sample = df.sample(False, 0.4, 11)
df_sample.count()

207

In [None]:
# Stratified sampling in PySpark
df = df_temp.withColumn('Year',year('Date'))
df_strat = df.sampleBy("Year", fractions={2015: 0.2, 2016: 0.4,
2017: 0.4}, seed=11)
df_strat.count()

162

# Pandas Support

In [None]:
# Pandas to PySpark
df_pandas=df.toPandas()
# Pandas to PySpark
df_py = spark.createDataFrame(df_pandas)
df_py.show(5)

[Stage 88:>                                                         (0 + 1) / 1]

+----------+----------+----------+--------+----------+--------------------+----+
|      Date|      Open|     Close|  Volume| direction|           headlines|Year|
+----------+----------+----------+--------+----------+--------------------+----+
|2015-02-17|127.489998|127.830002|63152400|Increasing|A group of journa...|2015|
|2015-02-18|127.629997|128.720001|44891700|Increasing|Playboy Enterpris...|2015|
|2015-02-19|128.479996|128.449997|37362400|Decreasing|Hardware and soft...|2015|
|2015-02-20|128.619995|     129.5|48948400|Increasing|GMO Internet, a l...|2015|
|2015-02-23|130.020004|       133|70974100|Increasing|A Tokyo-based rea...|2015|
+----------+----------+----------+--------+----------+--------------------+----+
only showing top 5 rows



                                                                                