In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, max, min, format_number

spark = SparkSession.builder \
        .appName("CompanyABC") \
        .getOrCreate()

def extract(): #Creating an extract function
    
    stock_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CompanyABC_stock.csv") #Extracting all 3 files
    april_sales_df = spark.read.option("header", "true").option("inferSchema", "true").csv("Sales_April_2019.csv")
    february_sales_df = spark.read.option("header", "true").option("inferSchema", "true").csv("Sales_February_2019.csv")
    feb_apr_df = february_sales_df.unionByName(april_sales_df) #Combining February and April sales into 1 dataframe
    return stock_df, feb_apr_df

stock_df, feb_apr_df = extract() #Assigning variables to their respective dataframes

stock_df.head(5) #shwoing the first 5 rows

[Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [59]:
#Using the .withcolumn to create a new column with the value of the 'high' column divided by the 'volume' column to get the high value ratio.
stock_df = stock_df.withColumn("HV Ratio", col("High") / col("Volume"))
stock_df.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

In [60]:
#Orders the 'High' column in descending order and selecting the first entry in that column being the highest. I also truncated it to 2 decimal places.
highest_day = stock_df.orderBy(col("High").desc()).select("date", "high").first() 
print(f"The highest day was {highest_day['date']} at ${highest_day['high']:.2f}")

The highest day was 2015-01-13 at $90.97


In [61]:
#Using the .agg(mean("Close")) to get the mean of the closing amount of the "Close" column. Using .first()[0] to get the first and only row.
avg_close = stock_df.agg(mean("Close")).first()[0]
print(f"The average (mean) closing price is ${avg_close:.2f}")

The average (mean) closing price is $72.39


In [62]:
#Using the .agg max and min to get each value respectively in the "Volume" column and using .alias to rename the output and format_number to making it more readable
volume = stock_df.agg(format_number(max("Volume"), 0).alias("Max Volume"), format_number(min("Volume"), 0).alias("Min Volume"))
volume.show()

+----------+----------+
|Max Volume|Min Volume|
+----------+----------+
|80,898,100| 2,094,900|
+----------+----------+



In [63]:
#Using .filter on the 'Close' column to filter out any value 70 or higher and using .count() to count the remaining rows. 
stock_df.filter(stock_df["Close"] < 70).count()

397

In [64]:
#Filtering the "high" column for anything over 80 and counting the remainder then dividing it by total days by counting all rows then multiplying it by 100 to convert to a percent
percent = stock_df.filter(stock_df["High"] > 80).count() / stock_df.count()*100
print(f"The percentage of time that the 'High' was greater than 80 is {percent:.2f}%")

The percentage of time that the 'High' was greater than 80 is 9.14%


In [65]:
feb_apr_df.show() #Checking the combined dataframe

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  150502|              iPhone|               1|     700.0|02/18/19 01:35|866 Spruce St, Po...|
|  150503|AA Batteries (4-p...|               1|      3.84|02/13/19 07:24|18 13th St, San F...|
|  150504|27in 4K Gaming Mo...|               1|    389.99|02/18/19 09:46|52 6th St, New Yo...|
|  150505|Lightning Chargin...|               1|     14.95|02/02/19 16:47|129 Cherry St, At...|
|  150506|AA Batteries (4-p...|               2|      3.84|02/28/19 20:32|548 Lincoln St, S...|
|  150507|Lightning Chargin...|               1|     14.95|02/24/19 18:50|387 12th St, Aust...|
|  150508|AA Batteries (4-p...|               1|      3.84|02/21/19 19:26|622 Center St, Sa...|
|  150509|Apple Airpods Hea...|         

In [66]:
#Creating a 'Total Price' column using .withColumn by multiplying price each and quantity ordered and truncating it to 2 decimal places. 
# Using format_number to make final result more readable for numbers over 3 elements long
feb_apr_df = feb_apr_df.withColumn("Total Price", format_number((col("Price Each") * col("Quantity Ordered")), 2))

feb_apr_df.show()

+--------+--------------------+----------------+----------+--------------+--------------------+-----------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|Total Price|
+--------+--------------------+----------------+----------+--------------+--------------------+-----------+
|  150502|              iPhone|               1|     700.0|02/18/19 01:35|866 Spruce St, Po...|     700.00|
|  150503|AA Batteries (4-p...|               1|      3.84|02/13/19 07:24|18 13th St, San F...|       3.84|
|  150504|27in 4K Gaming Mo...|               1|    389.99|02/18/19 09:46|52 6th St, New Yo...|     389.99|
|  150505|Lightning Chargin...|               1|     14.95|02/02/19 16:47|129 Cherry St, At...|      14.95|
|  150506|AA Batteries (4-p...|               2|      3.84|02/28/19 20:32|548 Lincoln St, S...|       7.68|
|  150507|Lightning Chargin...|               1|     14.95|02/24/19 18:50|387 12th St, Aust...|      14.95|
|  150508|AA Batteries (4-p.

In MySql I used "create database CompanyABC_DB;" to create the blank database.

In [67]:
#Writing stock_df to the SQL database
stock_df.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/CompanyABC_DB") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "Company_Stock_Data") \
    .option("user", "root") \
    .option("password", "password") \
    .mode("append") \
    .save()

#Writing the feb_apr_df to the SQL database
feb_apr_df.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/CompanyABC_DB") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "Feb_Apr_Sales_Data") \
    .option("user", "root") \
    .option("password", "password") \
    .mode("append") \
    .save()