# Reading Data from a File

In [237]:
!pip install pyspark



In [238]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.appName("PySpark Basics").getOrCreate()

# Define file location and read options
file_location = "NVL Historical Data.csv"
file_type = "csv"
infer_schema = "False"
first_row_is_header = "True"

# Read the CSV file into a DataFrame
df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .load(file_location)

# Print the DataFrame schema, data types, and record count
df.printSchema()
print(df.dtypes)
print(df.count())


                                                                                

root
 |-- Date: string (nullable = true)
 |--   Price  : string (nullable = true)
 |--   Open  : string (nullable = true)
 |--   High  : string (nullable = true)
 |--   Low  : string (nullable = true)
 |--   Vol  : string (nullable = true)
 |-- Change: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- mavg: string (nullable = true)

[('Date', 'string'), ('  Price  ', 'string'), ('  Open  ', 'string'), ('  High  ', 'string'), ('  Low  ', 'string'), ('  Vol  ', 'string'), ('Change', 'string'), ('direction', 'string'), ('mavg', 'string')]



[Stage 327:>                                                        (0 + 1) / 1]

1250



                                                                                

# Subset Columns and View a Glimpse of the Data

In [239]:
# Define the list of required columns
columns = ["Date", "Price", "Open", "High", "Low", "Vol", "Change", "direction", "mavg"]

# Rename columns to remove leading and trailing spaces
for column in df.columns:
    df = df.withColumnRenamed(column, column.strip())

# Select the specified columns
df = df.select(columns)

# Show the first 100 rows
df.show(100)

+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|      Date|Price| Open| High|  Low|  Vol|Change| direction|       mavg|
+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|2023-09-29|15550|15800|16150|15550|12.38| -0.01|Decreasing|      15600|
|2023-09-28|15750|15500|16050|15300|20.77|  0.02|Increasing|      15675|
|2023-09-27|15500|15250|15500|14500| 30.5|  0.02|Increasing|15616.66667|
|2023-09-26|15150|15800|16500|15150|33.77| -0.06|Decreasing|      15500|
|2023-09-25|16200|17200|17600|16200|28.12| -0.07|Decreasing|      15640|
|2023-09-22|17400|17700|17800|16950|36.48| -0.04|Decreasing|15933.33333|
|2023-09-21|18200|18750|19150|18000|30.13| -0.02|Decreasing|16257.14286|
|2023-09-20|18500|17200|18500|16600|40.16|  0.07|Increasing|    16537.5|
|2023-09-19|17300|18250|18450|17050|54.72| -0.05|Decreasing|      16750|
|2023-09-18|18150|18900|18900|18050|28.32| -0.04|Decreasing|      17050|
|2023-09-15|18900|19000|19250|18650|26.73|  0.01|In

In [240]:
from pyspark.sql.functions import round

df.select(df.columns[3], round(df.columns[2]), df.columns[6], 'direction').show(10, False)

+-----+--------------+------+----------+
|High |round(Open, 0)|Change|direction |
+-----+--------------+------+----------+
|16150|15800.0       |-0.01 |Decreasing|
|16050|15500.0       |0.02  |Increasing|
|15500|15250.0       |0.02  |Increasing|
|16500|15800.0       |-0.06 |Decreasing|
|17600|17200.0       |-0.07 |Decreasing|
|17800|17700.0       |-0.04 |Decreasing|
|19150|18750.0       |-0.02 |Decreasing|
|18500|17200.0       |0.07  |Increasing|
|18450|18250.0       |-0.05 |Decreasing|
|18900|18900.0       |-0.04 |Decreasing|
+-----+--------------+------+----------+
only showing top 10 rows



# Missing Values


In [241]:
from pyspark.sql.functions import *
#Calculate the missing values in a single column or in multiple columns by using
#the built-in functions in PySpark, as follows
missing_count = df.filter((df['Vol'] == '') | df['Vol'].isNull() | isnan(df['Vol'])).count()
print("Number of missing values in 'Vol' column:", missing_count)

[Stage 334:>                                                        (0 + 1) / 1]

Number of missing values in 'Vol' column: 734



                                                                                

In [242]:
#Calculate all the missing values in the DataFrame,
missing_counts = df.select([count(when((col(c) == '') | col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
missing_counts.show()

+----+-----+----+----+---+---+------+---------+----+
|Date|Price|Open|High|Low|Vol|Change|direction|mavg|
+----+-----+----+----+---+---+------+---------+----+
|   0|    0|   0|   0|  0|734|     0|        0|   0|
+----+-----+----+----+---+---+------+---------+----+



# One-Way Frequencies


In [243]:
df.groupBy(df['direction']).count().show()

+----------+-----+
| direction|count|
+----------+-----+
|Increasing|  575|
|Decreasing|  675|
+----------+-----+



# Sorting and Filtering One-Way Frequencies

In [244]:
# Subsetting and creating a temporary DataFrame to eliminate any missing values
df_temp=df.filter((df['mavg']!='')&(df['mavg'].isNotNull()) &
(~isnan(df['mavg'])))
df_temp.show()

+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|      Date|Price| Open| High|  Low|  Vol|Change| direction|       mavg|
+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|2023-09-29|15550|15800|16150|15550|12.38| -0.01|Decreasing|      15600|
|2023-09-28|15750|15500|16050|15300|20.77|  0.02|Increasing|      15675|
|2023-09-27|15500|15250|15500|14500| 30.5|  0.02|Increasing|15616.66667|
|2023-09-26|15150|15800|16500|15150|33.77| -0.06|Decreasing|      15500|
|2023-09-25|16200|17200|17600|16200|28.12| -0.07|Decreasing|      15640|
|2023-09-22|17400|17700|17800|16950|36.48| -0.04|Decreasing|15933.33333|
|2023-09-21|18200|18750|19150|18000|30.13| -0.02|Decreasing|16257.14286|
|2023-09-20|18500|17200|18500|16600|40.16|  0.07|Increasing|    16537.5|
|2023-09-19|17300|18250|18450|17050|54.72| -0.05|Decreasing|      16750|
|2023-09-18|18150|18900|18900|18050|28.32| -0.04|Decreasing|      17050|
|2023-09-15|18900|19000|19250|18650|26.73|  0.01|In

In [245]:
# Subsetting the DataFrame to Open that are repeated more than 20 times
columns=["Date","Open","Price","Vol","mavg","direction"]
df_temp = df.select(columns)
df_temp=df_temp.withColumn("Rounded_Open", round(col("Open"), 0))
df_temp.show()
df_temp.groupby(df_temp['Rounded_Open'])\
.count().filter("`count` >20")\
.sort(col("count").desc()).show(10,False)

+----------+-----+-----+-----+-----------+----------+------------+
|      Date| Open|Price|  Vol|       mavg| direction|Rounded_Open|
+----------+-----+-----+-----+-----------+----------+------------+
|2023-09-29|15800|15550|12.38|      15600|Decreasing|     15800.0|
|2023-09-28|15500|15750|20.77|      15675|Increasing|     15500.0|
|2023-09-27|15250|15500| 30.5|15616.66667|Increasing|     15250.0|
|2023-09-26|15800|15150|33.77|      15500|Decreasing|     15800.0|
|2023-09-25|17200|16200|28.12|      15640|Decreasing|     17200.0|
|2023-09-22|17700|17400|36.48|15933.33333|Decreasing|     17700.0|
|2023-09-21|18750|18200|30.13|16257.14286|Decreasing|     18750.0|
|2023-09-20|17200|18500|40.16|    16537.5|Increasing|     17200.0|
|2023-09-19|18250|17300|54.72|      16750|Decreasing|     18250.0|
|2023-09-18|18900|18150|28.32|      17050|Decreasing|     18900.0|
|2023-09-15|19000|18900|26.73|      17475|Increasing|     19000.0|
|2023-09-14|19600|18700|76.98|   17918.75|Decreasing|     1960

# Casting Variables

In [246]:
#Casting
df = df.withColumn('Vol',df['Vol'].cast("float"))
#After Casting
df.dtypes
df.show()

+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|      Date|Price| Open| High|  Low|  Vol|Change| direction|       mavg|
+----------+-----+-----+-----+-----+-----+------+----------+-----------+
|2023-09-29|15550|15800|16150|15550|12.38| -0.01|Decreasing|      15600|
|2023-09-28|15750|15500|16050|15300|20.77|  0.02|Increasing|      15675|
|2023-09-27|15500|15250|15500|14500| 30.5|  0.02|Increasing|15616.66667|
|2023-09-26|15150|15800|16500|15150|33.77| -0.06|Decreasing|      15500|
|2023-09-25|16200|17200|17600|16200|28.12| -0.07|Decreasing|      15640|
|2023-09-22|17400|17700|17800|16950|36.48| -0.04|Decreasing|15933.33333|
|2023-09-21|18200|18750|19150|18000|30.13| -0.02|Decreasing|16257.14286|
|2023-09-20|18500|17200|18500|16600|40.16|  0.07|Increasing|    16537.5|
|2023-09-19|17300|18250|18450|17050|54.72| -0.05|Decreasing|      16750|
|2023-09-18|18150|18900|18900|18050|28.32| -0.04|Decreasing|      17050|
|2023-09-15|18900|19000|19250|18650|26.73|  0.01|In

In [247]:
#Importing necessary libraries
from pyspark.sql.types import *
#Identifying and assigning lists of variables
float_vars=['Open', 'Price', 'Vol','mavg']
date_vars=['Date']
#Converting variables
for column in float_vars:
 df=df.withColumn(column,df[column].cast(FloatType()))
for column in date_vars:
 df=df.withColumn(column,df[column].cast(DateType()))

df.dtypes


[('Date', 'date'),
 ('Price', 'float'),
 ('Open', 'float'),
 ('High', 'string'),
 ('Low', 'string'),
 ('Vol', 'float'),
 ('Change', 'string'),
 ('direction', 'string'),
 ('mavg', 'float')]

In [248]:
df.show(10,False)

+----------+-------+-------+-----+-----+-----+------+----------+---------+
|Date      |Price  |Open   |High |Low  |Vol  |Change|direction |mavg     |
+----------+-------+-------+-----+-----+-----+------+----------+---------+
|2023-09-29|15550.0|15800.0|16150|15550|12.38|-0.01 |Decreasing|15600.0  |
|2023-09-28|15750.0|15500.0|16050|15300|20.77|0.02  |Increasing|15675.0  |
|2023-09-27|15500.0|15250.0|15500|14500|30.5 |0.02  |Increasing|15616.667|
|2023-09-26|15150.0|15800.0|16500|15150|33.77|-0.06 |Decreasing|15500.0  |
|2023-09-25|16200.0|17200.0|17600|16200|28.12|-0.07 |Decreasing|15640.0  |
|2023-09-22|17400.0|17700.0|17800|16950|36.48|-0.04 |Decreasing|15933.333|
|2023-09-21|18200.0|18750.0|19150|18000|30.13|-0.02 |Decreasing|16257.143|
|2023-09-20|18500.0|17200.0|18500|16600|40.16|0.07  |Increasing|16537.5  |
|2023-09-19|17300.0|18250.0|18450|17050|54.72|-0.05 |Decreasing|16750.0  |
|2023-09-18|18150.0|18900.0|18900|18050|28.32|-0.04 |Decreasing|17050.0  |
+----------+-------+-----

# Descriptive Statistics

In [249]:
columns=["Open","Price","Vol"]
# Subsetting the required columns from the DataFrame
df = df.select(columns)
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|              Open|             Price|               Vol|
+-------+------------------+------------------+------------------+
|  count|              1250|              1250|               516|
|   mean|        45396.5776|        45415.5504|408.38207434314165|
| stddev|24421.615683645086|24423.318754605192| 341.4620800104249|
|    min|           10250.0|           10250.0|              10.0|
|    max|           93969.0|           92366.0|             997.2|
+-------+------------------+------------------+------------------+



In [250]:
#Since unknown values in Open are marked to be 0, let’s filter out those
#values before calculating the median
df_temp = df.filter((df['Open']!=0)&(df['Open'].isNotNull()) &
(~isnan(df['Open'])))
#Here the second parameter indicates the median value, which is 0.5; you
#can also try adjusting the value to calculate other percentiles
median=df.approxQuantile('Open',[0.5],0.1)
#Printing the Value
print ('The median of Open is '+str(median))

The median of Open is [33490.0]


# Unique/Distinct Values and Counts

In [268]:
#You may sometimes just want to know the number of levels (cardinality) within
#a variable. You can do this using the countDistinct function available in Spark
# Counts the distinct occurances of titles
full_df.agg(countDistinct(col("Direction")).alias("Count")).show()

[Stage 393:>                                                        (0 + 1) / 1]

+-----+
|Count|
+-----+
|    2|
+-----+




                                                                                

In [269]:
# Counts the distinct occurances of Date
full_df.select('Date').distinct().show(10,False)


[Stage 397:>                                                        (0 + 1) / 1]

+----------+
|Date      |
+----------+
|2023-05-18|
|2022-10-05|
|2021-11-03|
|2020-04-13|
|2020-02-26|
|2019-08-23|
|2019-08-22|
|2019-08-08|
|2023-04-28|
|2023-04-21|
+----------+
only showing top 10 rows




                                                                                

# Filtering

In [270]:
# Now, let’s find out the dates that do not end with an "17"
df_temp.filter(~col('Date').like('%17')).show(3, False)


[Stage 400:>                                                        (0 + 1) / 1]

+-------+-------+-----+-------------+
|Open   |Price  |Vol  |newcat       |
+-------+-------+-----+-------------+
|15800.0|15550.0|12.38|{Small, High}|
|15500.0|15750.0|20.77|{Small, High}|
|15250.0|15500.0|30.5 |{Small, High}|
+-------+-------+-----+-------------+
only showing top 3 rows




                                                                                

In [271]:
#So, let’s first calculate the max by using the following command. The agg function
#used here is handy instead of using describe when you are looking for a specific statistic:
max_pop=df.agg({'Price': 'max'}).collect()[0]['max(Price)']
print(max_pop)
count_obs= df.count()
print(count_obs)
mean_pop=df.agg({'Price': 'mean'}).collect()[0]['avg(Price)']
print(mean_pop)

                                                                                

92366.0


                                                                                

1250
45415.5504


In [272]:
#The lit function is a way
#to interact with column literals. It is very useful when you want 
#to create a column with a value directly.
df=df.withColumn('mean_Price',lit(mean_pop))
df=df.withColumn('varaiance',pow((df['Price']-df['mean_Price']),2))
variance_sum=df.agg({'varaiance': 'sum'}).collect()[0]['sum(varaiance)']
print(variance_sum)
variance_population= variance_sum/(count_obs-1)
print(variance_population)


745026625237.3237
596498498.9890503


# Creating new column

In [273]:
def new_cols(Vol,Open):
 if Vol<408: Volume_cat='Small'
 elif Vol<508: Volume_cat='Medium'
 else: Volume_cat='Big'
 if Open<3: Open_cat='Low'
 elif Open<5: Open_cat='Mid'
 else: Open_cat='High'
 return Volume_cat,Open_cat
# Apply the user-defined function on the DataFrame
udfB=udf(new_cols,StructType([StructField("Volume_cat", StringType(),
True),StructField("Open_cat", StringType(), True)]))
# Pass a user-defined function with two input columns Open and Close
df_temp=df.select('Open','Price','Vol').withColumn("newcat",udfB("Vol","Open"))
df_temp.show(10)                                                                   
                                                                       

+-------+-------+-----+-------------+
|   Open|  Price|  Vol|       newcat|
+-------+-------+-----+-------------+
|15800.0|15550.0|12.38|{Small, High}|
|15500.0|15750.0|20.77|{Small, High}|
|15250.0|15500.0| 30.5|{Small, High}|
|15800.0|15150.0|33.77|{Small, High}|
|17200.0|16200.0|28.12|{Small, High}|
|17700.0|17400.0|36.48|{Small, High}|
|18750.0|18200.0|30.13|{Small, High}|
|17200.0|18500.0|40.16|{Small, High}|
|18250.0|17300.0|54.72|{Small, High}|
|18900.0|18150.0|28.32|{Small, High}|
+-------+-------+-----+-------------+
only showing top 10 rows



In [274]:
# Unbundle the struct type columns into individual columns and drop the struct type
df_with_newcols = df_temp.select('Open','Price','Vol','newcat')\
.withColumn('Volume_cat', df_temp.newcat\
.getItem('Volume_cat'))\
.withColumn('Open_cat', df_temp.newcat\
.getItem('Open_cat')).drop('newcat')
df_with_newcols.show(10,False)

+-------+-------+-----+----------+--------+
|Open   |Price  |Vol  |Volume_cat|Open_cat|
+-------+-------+-----+----------+--------+
|15800.0|15550.0|12.38|Small     |High    |
|15500.0|15750.0|20.77|Small     |High    |
|15250.0|15500.0|30.5 |Small     |High    |
|15800.0|15150.0|33.77|Small     |High    |
|17200.0|16200.0|28.12|Small     |High    |
|17700.0|17400.0|36.48|Small     |High    |
|18750.0|18200.0|30.13|Small     |High    |
|17200.0|18500.0|40.16|Small     |High    |
|18250.0|17300.0|54.72|Small     |High    |
|18900.0|18150.0|28.32|Small     |High    |
+-------+-------+-----+----------+--------+
only showing top 10 rows




[Stage 414:>                                                        (0 + 1) / 1]

                                                                                

Another way we can achieve the same result is through the when function. One
advantage of using this function is you don’t have to define the output data type. This
is handy for quick and dirty operations. Let’s recreate the preceding columns using the
when function.

In [275]:
# def new_cols(Volume,Open):
#  if Volume<48948400: Volume_cat='Small'
#  elif Volume<58948400: Volume_cat='Medium'
#  else: Volume_cat='Big'
#  if Open<3: Open_cat='Low'
#  elif Open<5: Open_cat='Mid'
#  else: Open_cat='High'
#  return Volume_cat,Open_cat
# In order to comment on multiple lines at once in Jupyter Notebook, 
# you have to select the required lines and then press the Ctrl + /
df_with_newcols = df_temp.select('Open','Price','Vol').\
withColumn('Volume_cat', when(df_temp['Vol']<408,'Small').\
when(df_temp['Vol']<508,'Medium').otherwise('Big')).\
withColumn('Open_cat', when(df_temp['Open']<45000,'Low').
when(df_temp['Open']<50000,'Mid').otherwise('High'))
df_with_newcols.show(10)

+-------+-------+-----+----------+--------+
|   Open|  Price|  Vol|Volume_cat|Open_cat|
+-------+-------+-----+----------+--------+
|15800.0|15550.0|12.38|     Small|     Low|
|15500.0|15750.0|20.77|     Small|     Low|
|15250.0|15500.0| 30.5|     Small|     Low|
|15800.0|15150.0|33.77|     Small|     Low|
|17200.0|16200.0|28.12|     Small|     Low|
|17700.0|17400.0|36.48|     Small|     Low|
|18750.0|18200.0|30.13|     Small|     Low|
|17200.0|18500.0|40.16|     Small|     Low|
|18250.0|17300.0|54.72|     Small|     Low|
|18900.0|18150.0|28.32|     Small|     Low|
+-------+-------+-----+----------+--------+
only showing top 10 rows



# Deleting and Renaming Columns

In [276]:
# Using the drop function. to drop any column or columns 
columns_to_drop=['Price']
df_with_newcols=df_with_newcols.drop(*columns_to_drop)
df_with_newcols.show(10)

+-------+-----+----------+--------+
|   Open|  Vol|Volume_cat|Open_cat|
+-------+-----+----------+--------+
|15800.0|12.38|     Small|     Low|
|15500.0|20.77|     Small|     Low|
|15250.0| 30.5|     Small|     Low|
|15800.0|33.77|     Small|     Low|
|17200.0|28.12|     Small|     Low|
|17700.0|36.48|     Small|     Low|
|18750.0|30.13|     Small|     Low|
|17200.0|40.16|     Small|     Low|
|18250.0|54.72|     Small|     Low|
|18900.0|28.32|     Small|     Low|
+-------+-----+----------+--------+
only showing top 10 rows



In [277]:
# Renaming can be done using 
# either the withColumnRenamed function or the alias function.
df_with_newcols = df_with_newcols\
.withColumnRenamed('Open','Renamed_Open')\
.withColumnRenamed('Vol','Renamed_Volume')
df_with_newcols.show(10)


+------------+--------------+----------+--------+
|Renamed_Open|Renamed_Volume|Volume_cat|Open_cat|
+------------+--------------+----------+--------+
|     15800.0|         12.38|     Small|     Low|
|     15500.0|         20.77|     Small|     Low|
|     15250.0|          30.5|     Small|     Low|
|     15800.0|         33.77|     Small|     Low|
|     17200.0|         28.12|     Small|     Low|
|     17700.0|         36.48|     Small|     Low|
|     18750.0|         30.13|     Small|     Low|
|     17200.0|         40.16|     Small|     Low|
|     18250.0|         54.72|     Small|     Low|
|     18900.0|         28.32|     Small|     Low|
+------------+--------------+----------+--------+
only showing top 10 rows



In [278]:
# To change multiple column names,try the following command:
# Define all the variable changes in the list
new_names = [('Volume_cat','Volume_cat1'),('Open_cat','Open_cat1')]
# Applying the alias function
df_with_newcols_renamed = df_with_newcols\
.select(list(map(lambda old,new:col(old).alias(new),*zip(*new_names))))
df_with_newcols_renamed.show(10)

+-----------+---------+
|Volume_cat1|Open_cat1|
+-----------+---------+
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
|      Small|      Low|
+-----------+---------+
only showing top 10 rows



# 5.2 Utility Functions and Visualizations

In [279]:
# To concatenate the values of budget_cat and ratings together into a
# single column, we can do so using the concat function. On top of this, let’s change the
# case of the new column to lowercase and trim away any white spaces using the lower
# and trim functions
# Concatenating two variables
df_with_newcols=df_with_newcols.\
withColumn('Open_Volume_Cat',concat(df_with_newcols.\
Volume_cat,df_with_newcols.Open_cat))
df_with_newcols.show(10)

+------------+--------------+----------+--------+---------------+
|Renamed_Open|Renamed_Volume|Volume_cat|Open_cat|Open_Volume_Cat|
+------------+--------------+----------+--------+---------------+
|     15800.0|         12.38|     Small|     Low|       SmallLow|
|     15500.0|         20.77|     Small|     Low|       SmallLow|
|     15250.0|          30.5|     Small|     Low|       SmallLow|
|     15800.0|         33.77|     Small|     Low|       SmallLow|
|     17200.0|         28.12|     Small|     Low|       SmallLow|
|     17700.0|         36.48|     Small|     Low|       SmallLow|
|     18750.0|         30.13|     Small|     Low|       SmallLow|
|     17200.0|         40.16|     Small|     Low|       SmallLow|
|     18250.0|         54.72|     Small|     Low|       SmallLow|
|     18900.0|         28.32|     Small|     Low|       SmallLow|
+------------+--------------+----------+--------+---------------+
only showing top 10 rows



# Registering DataFrames

In [280]:
# Registering temporary table
df_with_newcols.registerTempTable('temp_data')
# Applying the function to show the results
spark.sql('select Volume_cat, count(Volume_cat)\
from temp_data group by Volume_cat').show(10)


[Stage 420:>                                                        (0 + 1) / 1]

+----------+-----------------+
|Volume_cat|count(Volume_cat)|
+----------+-----------------+
|    Medium|               40|
|     Small|              239|
|       Big|              971|
+----------+-----------------+




                                                                                

# Window Functions

In [281]:
# Importing the window functions
from pyspark.sql.window import *
# Step 1: Filtering the missing values
df_with_newcols=df_with_newcols.filter( (df_with_newcols['Renamed_Open'].
isNotNull()) & (~isnan(df_with_newcols['Renamed_Open'])) )

In [282]:
# Step 2: Applying the window functions for calculating deciles
df_with_newcols = df_with_newcols.select("Renamed_Open","Renamed_Volume","Volume_cat",
ntile(10).over(Window.partitionBy().orderBy(df_with_newcols['Renamed_Volume'].
desc())).alias("decile_rank"))
df_with_newcols.show(10)

23/10/02 17:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+--------------+----------+-----------+
|Renamed_Open|Renamed_Volume|Volume_cat|decile_rank|
+------------+--------------+----------+-----------+
|     32544.0|         997.2|       Big|          1|
|     31709.0|         997.2|       Big|          1|
|     32822.0|         994.6|       Big|          1|
|     33656.0|         985.1|       Big|          1|
|     33100.0|         982.8|       Big|          1|
|     38552.0|         977.7|       Big|          1|
|     32544.0|         974.4|       Big|          1|
|     36994.0|         974.2|       Big|          1|
|     32544.0|         968.3|       Big|          1|
|     74000.0|         967.4|       Big|          1|
+------------+--------------+----------+-----------+
only showing top 10 rows



In [283]:
# Step 3:Dispalying the values
df_with_newcols.groupby("decile_rank")\
.agg(min('Renamed_Volume').alias('min_Volume'),max('Renamed_Volume')\
.alias('max_Volume'),count('Renamed_Volume')).show()

23/10/02 17:25:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/02 17:25:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------+----------+----------+---------------------+
|decile_rank|min_Volume|max_Volume|count(Renamed_Volume)|
+-----------+----------+----------+---------------------+
|          1|     742.0|     997.2|                  125|
|          2|     482.5|     741.3|                  125|
|          3|     34.69|     479.6|                  125|
|          4|     11.46|     34.31|                  125|
|          5|      10.0|     11.37|                   16|
|          6|      NULL|      NULL|                    0|
|          7|      NULL|      NULL|                    0|
|          8|      NULL|      NULL|                    0|
|          9|      NULL|      NULL|                    0|
|         10|      NULL|      NULL|                    0|
+-----------+----------+----------+---------------------+



# Other Useful Functions

In [None]:
full_df.show(200)
columns=["Date","Open","Close","Volume","direction"]
df_temp=full_df.select(columns)
df_temp=df_temp.withColumn('Year',year('Date'))
df_temp.show(5)

# Sampling

The first
parameter True/False indicates whether you would like to do a sample with or without
replacement. Here, we would like to do it without replacement, so we selected False.
The second parameter is the fraction. It indicates the proportion of the population you
would like to have in the sample. The third parameter is the seed, which guarantees you
the same result when you run this snippet every single time.

In [285]:
# Simple random sampling in PySpark with replacement
df_sample = df.sample(False, 0.4, 11)
df_sample.count()

491

# Pandas Support

In [286]:
# Pandas to PySpark
df_pandas=df.toPandas()
# Pandas to PySpark
df_py = spark.createDataFrame(df_pandas)
df_py.show(5)


[Stage 434:>                                                        (0 + 1) / 1]

+-------+-------+------------------+----------+-------------------+
|   Open|  Price|               Vol|mean_Price|          varaiance|
+-------+-------+------------------+----------+-------------------+
|15800.0|15550.0|12.380000114440918|45415.5504|8.919511006949402E8|
|15500.0|15750.0|20.770000457763672|45415.5504|8.800448805349401E8|
|15250.0|15500.0|              30.5|45415.5504|8.949401557349402E8|
|15800.0|15150.0| 33.77000045776367|45415.5504|9.160035410149401E8|
|17200.0|16200.0|  28.1200008392334|45415.5504|8.535483851749401E8|
+-------+-------+------------------+----------+-------------------+
only showing top 5 rows




                                                                                