### `Apache Spark (Pyspark) - SBA 345`

### In this skill-based assignment (SBA), you will analyze the dataset for the imaginary company, “CompanyABC,” using PySpark. The dataset is about stocks and sales, and you will be asked some basic questions about CompanyABC stock and sales data. 


#### `Section one: CompanyABC Stock Data` 
##### Load/Read the CompanyABC stock (CompanyABC_stock.csv) data into the SparkSQL DataFrame [hint read()]. 
##### After loading the data, you can inspect the data by using the lines of the code below. 

In [2]:
import pyspark  # Imported the required packages 
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName("CSV Reader").getOrCreate() 
company_df = spark.read.load(r"C:\Users\CompanyABC_stock.csv", format="csv", header = True,inferSchema = True)
#the above line will return data in a DataFrame

company_df.printSchema()

company_df.show()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|   

##### Inspecting the data

In [3]:
company_df.columns
company_df.printSchema()
company_df.describe().show()
company_df.show()



root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.3899989999

##### Print the first five rows

In [4]:
company_df.count() # Return number of rows 
company_df.printSchema() #Return schema 
company_df.show(5) # Showing first five rows

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58

##### Create a new DataFrame column called "HV Ratio", which will stimulate the ratio of the High price versus the total Volume of stock that was traded for a day. 

In [5]:
from pyspark.sql.functions import col
company_df = company_df.withColumn("HV Ratio", col("High") / col("Volume")) 
company_df.select("High", "Volume", "HV Ratio").show() 

+------------------+--------+--------------------+
|              High|  Volume|            HV Ratio|
+------------------+--------+--------------------+
|         61.060001|12668800|4.819714653321546E-6|
|         60.349998| 9593300|6.290848613094555E-6|
|         59.619999|12768200|4.669412994783916E-6|
|         59.450001| 8069400|7.367338463826307E-6|
|         59.549999| 6679300|8.915604778943901E-6|
|59.709998999999996| 6907300|8.644477436914568E-6|
|         59.529999| 6365600|9.351828421515645E-6|
|              60.0| 7236400| 8.29141562102703E-6|
|59.610001000000004| 7729300|7.712212102001476E-6|
|60.110001000000004| 8500000|7.071764823529412E-6|
|         60.029999| 5911400|1.015495466386981E-5|
|             60.73| 9234600|6.576354146362592...|
|             61.25|10378800| 5.90145296180676E-6|
|             60.98| 7134100|8.547679455011844E-6|
|              62.0| 7362800|8.420709512685392E-6|
|61.610001000000004| 5915800|1.041448341728929...|
|             61.84| 7436200|8.

##### Find out on what day the stock price was the highest. (Hint: use the High column) 

In [6]:
from pyspark.sql.functions import max
max_high = company_df.agg(max("High")).collect()[0][0] 
highest_day_df = company_df.filter(col("High") == max_high)
highest_day_df.select("Date", "High").show()

+----------+---------+
|      Date|     High|
+----------+---------+
|2015-01-13|90.970001|
+----------+---------+



##### What is the average (mean) closing price? (Hint: Use the Close column) 

In [7]:
from pyspark.sql.functions import avg

company_df.select(avg("Close")).show()


+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



##### What are the maximum and minimum volumes of stock traded? (Hint: Use the Count() method)

In [8]:
from pyspark.sql.functions import max, min

company_df.select(
    max("Volume").alias("Max Volume"),
    min("Volume").alias("Min Volume")
).show()


+----------+----------+
|Max Volume|Min Volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+



##### For how many days was the closing value less than 70 dollars? (Hint: Use the count() method) 

In [9]:
num_days = company_df.filter(company_df["Close"] < 70).count()

print(f"Number of days with closing value less than $70: {num_days}")


Number of days with closing value less than $70: 397


##### What percentage of the time was the High greater than 80 dollars? (Hint: Number of Days High>80)/(Total Days in the dataset). 

In [10]:

days_high_greater_80 = company_df.filter(company_df["High"] > 80).count()

total_days = company_df.count()

percentage = (days_high_greater_80 / total_days) * 100

print(f"Percentage of days greater than > $80: {percentage:.2f}%")


Percentage of days greater than > $80: 9.14%


##### Create a database named CompanyABC_DB using SQL (Workbench). 
##### Load/Write CompanyABC_stock.csv file data into CompanyABC_DB database from the SparkSQL Dataframe. You can specify any table name for that file.

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", r"C:\Users\CompanyABC_stock.csv").getOrCreate()


In [12]:
company_df.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:mysql://localhost:3306/CompanyABC_DB") \
    .option("dbtable", "money_data") \
    .option("user", "root") \
    .option("password", "password") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .save()


##### Section Two: CompanyABC Scales Data

##### Load/Read both CompanyABC sales datasets (Sales_April_2019.csv and Sales_February_2019.csv) into a single SparkSQL DataFrame ( hint read() ). You can inspect data by using the lines below:


In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("CompanyABC Sales") \
    .getOrCreate()

april_df = spark.read.option("header", "true").csv(r"C:\Users\Sales_April_2019.csv")
feb_df = spark.read.option("header", "true").csv(r"C:\Users\Sales_February_2019.csv")

sales_df = april_df.union(feb_df)

# Inspect the data
sales_df.show(5)
sales_df.printSchema()


+--------+--------------------+----------------+----------+---------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|     Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+---------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95| 4/19/2019 8:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99| 4/7/2019 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|4/12/2019 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|4/12/2019 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99| 4/30/2019 9:27|333 8th St, Los A...|
+--------+--------------------+----------------+----------+---------------+--------------------+
only showing top 5 rows

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Orde

In [14]:
april_df.columns
feb_df.columns
april_df.printSchema()
feb_df.printSchema()
april_df.describe().show() 
feb_df.describe().show() 
april_df.show()
feb_df.show() 



root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|  count|             18324|       18324|              18324|             18324|         18324|               18324|
| 

In [15]:
from pyspark.sql.functions import col
sales_df = sales_df.withColumn(
    "Total price",
    (col("Quantity Ordered").cast("float") * col("Price Each").cast("float"))
)
sales_df.select("Quantity Ordered", "Price Each", "Total price").show(5)



+----------------+----------+-----------+
|Quantity Ordered|Price Each|Total price|
+----------------+----------+-----------+
|               2|     11.95|       23.9|
|               1|     99.99|      99.99|
|               1|       600|      600.0|
|               1|     11.99|      11.99|
|               1|     11.99|      11.99|
+----------------+----------+-----------+
only showing top 5 rows



In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("WriteSalesData") \
    .config("spark.jars", "C:/path/to/mysql-connector-java-8.0.xx.jar") \
    .getOrCreate()


In [17]:
sales_df.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:mysql://localhost:3306/CompanyABC_DB") \
    .option("dbtable", "money_data") \
    .option("user", "root") \
    .option("password", "password") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .save()
