In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession as sess

In [3]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))

In [4]:
spark = sess.builder.appName('Dataframe').getOrCreate()

In [5]:
spark

In [6]:
spark.read.option('header', 'true').csv('stocks.csv').show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [7]:
df = spark.read.option('header', 'true').csv('stocks.csv', inferSchema = True)

In [8]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [9]:
filtered_spx = df.filter(df['Close'] < 500)

# Select the required columns
selected_columns = ['Open', 'Close', 'Volume']
filtered_spx = filtered_spx.select(*selected_columns)

# Show the resulting dataframe
filtered_spx.show()


+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        214.379993|150476200|
|        214.379993|        210.969995|138040000|
|            211.75|            210.58|119282800|
|        210.299994|211.98000499999998|111902700|
|212.79999700000002|210.11000299999998|115557400|
|209.18999499999998|        207.720001|148614900|
|        207.870005|        210.650002|151473000|
|210.11000299999998|            209.43|108223500|
|210.92999500000002|            205.93|148516900|
|        208.330002|        215.039995|182501900|
|        214.910006|            211.73|153038200|
|        212.079994|        208.069996|152038600|
|206.78000600000001|            197.75|220441900|
|202.51000200000001|        203.070002|266424900|
|205.95000100000001|        205.940001|466777500|
|        206.849995|        207.880005|430642100|


In [10]:
filtered_spx2 = df.filter((df['Open'] > 200) & (df['Close'] < 200))

# Select the required columns
selected_columns = ['Open', 'Close', 'Volume']
filtered_spx2 = filtered_spx2.select(*selected_columns)

# Show the resulting dataframe
filtered_spx2.show()


+------------------+----------+---------+
|              Open|     Close|   Volume|
+------------------+----------+---------+
|206.78000600000001|    197.75|220441900|
|        204.930004|199.289995|293375600|
|        201.079996|192.060003|311488100|
+------------------+----------+---------+



In [11]:
# Assuming you already have the 'spx' dataframe loaded

from pyspark.sql.functions import year, col

# Add a new column called 'Year' with the extracted year
df_with_year = df.withColumn('Year', year(col('Date')))

# Show the resulting dataframe with the new 'Year' column
df_with_year.show()


+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [12]:
from pyspark.sql.functions import min

min_volumes_by_year = df_with_year.groupBy("year").agg(min("volume").alias("minVolume"))
min_volumes_by_year.show()

+----+---------+
|year|minVolume|
+----+---------+
|2015| 13046400|
|2013| 41888700|
|2014| 14479600|
|2012| 43938300|
|2016| 11475900|
|2010| 39373600|
|2011| 44915500|
+----+---------+



In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, max

# Create a Spark session
# Assuming you already have the DataFrame 'df' with the 'year' and 'month' columns
# (as calculated in the previous steps)

# Calculate the highest low price for each year and month
max_low_by_year_month = df.groupBy(year("Date").alias("year"), month("Date").alias("month")) \
    .agg(max("Low").alias("maxLow"))

# Show the resulting DataFrame
max_low_by_year_month.show()


+----+-----+------------------+
|year|month|            maxLow|
+----+-----+------------------+
|2012|   10|        665.550026|
|2010|    7|        260.300003|
|2010|   12|        325.099991|
|2015|    2|        131.169998|
|2014|    4|        589.799988|
|2015|   12|        117.809998|
|2016|    7|            103.68|
|2016|   11|        111.400002|
|2012|    8| 673.5400089999999|
|2013|    2|473.24997699999994|
|2012|    4| 626.0000150000001|
|2012|   12|        585.500023|
|2014|   10|        107.209999|
|2016|    5|             99.25|
|2014|   12|        115.290001|
|2013|    9|        503.479988|
|2013|   10|        525.110016|
|2014|    5|        628.900002|
|2016|    2|         96.650002|
|2013|   12| 566.4100269999999|
+----+-----+------------------+
only showing top 20 rows



In [16]:
from pyspark.sql.functions import mean, stddev

# Create a Spark session


# Assuming you already have the DataFrame 'df' with the 'highest' column
# (as calculated in the previous steps)

# Calculate mean and standard deviation for the 'highest' column
result = df.agg(mean("High").alias("mean_high_price"), stddev("High").alias("stddev_high_price"))

# Show the result with two decimal places
result.show(truncate=False)


+-----------------+------------------+
|mean_high_price  |stddev_high_price |
+-----------------+------------------+
|315.9112880164581|186.89817686485767|
+-----------------+------------------+

