In [1]:
#Importing Spark Library
import findspark
findspark.init()

In [2]:
#Import DataFrame library
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .appName("Walmart_SparkSQL_Analysis.com") \
        .master("local[1]") \
        .config('spark.jars' , 'mysql-connector-j-8.0.32.jar') \
        .getOrCreate()

In [4]:
#Creating pipeline to read data from Source[MySQL]
walmart_stock = spark.read.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","walmart_stock") \
                    .option("user","root") \
                    .option("password","Kaviya@2106") \
                    .load()

In [5]:
#View the content of the dataset
walmart_stock.show()

+-------------------+-----+-----+-----+-----+--------+------------------+
|               date| open| high|close|  low|  volume|         adj_close|
+-------------------+-----+-----+-----+-----+--------+------------------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|         52.078475|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|         51.825539|
|2012-01-06 00:00:00|59.42|59.45|58.87| 59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|51.616215000000004|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6907300|         51.494109|
|2012-01-11 00:00:00|59.06|59.53|59.04| 59.4| 6365600|         51.808098|
|2012-01-12 00:00:00|59.79| 60.0| 59.4| 59.5| 7236400|51.895315999999994|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7729300|51.930203999999996|
|2012-01-17 00:00:00|59.87|60.11|59.52|59.85| 8500000|         52.200581|
|2012-01-18 00:00:00|59.79|60.03|59.65

In [6]:
#Describe the dataframe
walmart_stock.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- close: double (nullable = true)
 |-- low: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- adj_close: double (nullable = true)



In [7]:
#SparkSQL analysis
walmart_stock.createOrReplaceTempView("walmart")

## 1. Create a new dataframe with a column called VH Ratio that is the ratio of the volume versus High Price of stock traded for a day.

In [8]:
#Volume/High
12668800/61.06

207481.1660661644

In [9]:
spark.sql("SELECT date,ROUND((volume/high),2)AS VH_ratio FROM walmart").show()

+-------------------+---------+
|               date| VH_ratio|
+-------------------+---------+
|2012-01-03 00:00:00|207481.17|
|2012-01-04 00:00:00|158961.06|
|2012-01-05 00:00:00|214159.68|
|2012-01-06 00:00:00|135734.23|
|2012-01-09 00:00:00|112162.89|
|2012-01-10 00:00:00|115680.79|
|2012-01-11 00:00:00|106930.96|
|2012-01-12 00:00:00|120606.67|
|2012-01-13 00:00:00|129664.49|
|2012-01-17 00:00:00|141407.42|
|2012-01-18 00:00:00|  98474.1|
|2012-01-19 00:00:00|152059.94|
|2012-01-20 00:00:00| 169449.8|
|2012-01-23 00:00:00|116990.82|
|2012-01-24 00:00:00|118754.84|
|2012-01-25 00:00:00| 96020.13|
|2012-01-26 00:00:00|120249.03|
|2012-01-27 00:00:00|102868.13|
|2012-01-30 00:00:00|124541.75|
|2012-01-31 00:00:00|158543.12|
+-------------------+---------+
only showing top 20 rows



In [10]:
VH_ratio = spark.sql("SELECT date,ROUND((volume/high),2)AS VH_ratio FROM walmart")

## 2. What day had the Peak High in Price?

In [11]:
spark.sql("SELECT date,high FROM walmart ORDER BY high DESC LIMIT 10").show()

+-------------------+-----+
|               date| high|
+-------------------+-----+
|2015-01-13 00:00:00|90.97|
|2015-01-08 00:00:00|90.67|
|2015-01-09 00:00:00|90.39|
|2015-01-12 00:00:00|90.31|
|2015-01-23 00:00:00|89.26|
|2015-01-26 00:00:00|89.16|
|2015-01-07 00:00:00|88.68|
|2015-01-14 00:00:00|88.52|
|2015-01-27 00:00:00|88.46|
|2015-01-22 00:00:00| 88.4|
+-------------------+-----+



In [12]:
peak_high_price = spark.sql("SELECT date,high FROM walmart ORDER BY high DESC LIMIT 10")

## 3. What is the max and min of the Volume column?

In [13]:
spark.sql("SELECT MAX(volume)AS max_volume,MIN(volume)AS min_volume FROM walmart").show()

+----------+----------+
|max_volume|min_volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+



In [14]:
max_min_vol = spark.sql("SELECT MAX(volume)AS max_volume,MIN(volume)AS min_volume FROM walmart")

## 4. How many days was the Close lower than 60 dollars?

In [15]:
spark.sql("SELECT date,close FROM walmart WHERE close < 60").show()

+-------------------+-----+
|               date|close|
+-------------------+-----+
|2012-01-03 00:00:00|59.87|
|2012-01-04 00:00:00|59.47|
|2012-01-05 00:00:00|58.37|
|2012-01-06 00:00:00|58.87|
|2012-01-09 00:00:00|58.92|
|2012-01-10 00:00:00|58.98|
|2012-01-11 00:00:00|59.04|
|2012-01-12 00:00:00| 59.4|
|2012-01-13 00:00:00|59.01|
|2012-01-17 00:00:00|59.52|
|2012-01-18 00:00:00|59.65|
|2012-01-19 00:00:00|59.75|
|2012-02-21 00:00:00|59.67|
|2012-02-22 00:00:00|58.37|
|2012-02-23 00:00:00|58.21|
|2012-02-24 00:00:00| 58.5|
|2012-02-27 00:00:00|58.29|
|2012-02-28 00:00:00|58.35|
|2012-02-29 00:00:00|58.72|
|2012-03-01 00:00:00|58.64|
+-------------------+-----+
only showing top 20 rows



In [16]:
min60_close_days = spark.sql("SELECT date,close FROM walmart WHERE close < 60")

## 5. What percentage of the time was the High greater than 80 dollars ?
## In other words, (Number of Days High>80)/(Total Days in the dataset)


In [17]:
spark.sql("SELECT COUNT(date) AS percentage FROM walmart WHERE high > 80").show()

+----------+
|percentage|
+----------+
|       115|
+----------+



In [18]:
no_of_days = spark.sql("SELECT COUNT(date)AS no_of_days FROM walmart WHERE high > 80")

In [19]:
spark.sql("SELECT COUNT(*)AS tot_days FROM walmart").show()

+--------+
|tot_days|
+--------+
|    1258|
+--------+



In [20]:
tot_days = spark.sql("SELECT COUNT(*)AS tot_days FROM walmart")

In [21]:
115/1258 * 100

9.141494435612083

In [22]:
#Method - 1
spark.sql("SELECT ROUND(((SELECT COUNT(date) FROM walmart WHERE high > 80)/(SELECT COUNT(*) FROM walmart))*100,3) AS percentage FROM walmart LIMIT 1").show()

+----------+
|percentage|
+----------+
|     9.141|
+----------+



In [23]:
#Method - 2
spark.sql("SELECT (sum(if(high>80,1,0)) / count(*)) * 100 AS percentage FROM walmart").show()

+-----------------+
|       percentage|
+-----------------+
|9.141494435612083|
+-----------------+



In [24]:
percentage = spark.sql("SELECT (sum(if(high>80,1,0)) / count(*)) * 100 AS percentage FROM walmart")

## 6. What is the Pearson correlation between High and Volume?

In [25]:
from pyspark.sql.functions import corr
spark.sql("SELECT corr(high, volume) AS pearson_correlation FROM walmart").show()

+--------------------+
| pearson_correlation|
+--------------------+
|-0.33843260582148915|
+--------------------+



In [26]:
pearson_correlation = spark.sql("SELECT corr(high, volume) AS pearson_correlation FROM walmart")

## 7. What is the max High per year?

In [27]:
spark.sql("SELECT YEAR(date)AS year, MAX(high)AS max_high FROM walmart GROUP BY year ORDER BY year").show()

+----+--------+
|year|max_high|
+----+--------+
|2012|    77.6|
|2013|   81.37|
|2014|   88.09|
|2015|   90.97|
|2016|   75.19|
+----+--------+



In [28]:
year_maxhigh = spark.sql("SELECT YEAR(date)AS year, MAX(high)AS max_high FROM walmart GROUP BY year ORDER BY year")

## 8. What is the average Close for each Calendar Month?

In [29]:
spark.sql("SELECT MONTH(date)AS month,ROUND(AVG(close),3)AS avg_close FROM walmart GROUP BY month ORDER BY month" ).show()

+-----+---------+
|month|avg_close|
+-----+---------+
|    1|   70.904|
|    2|   70.694|
|    3|   71.319|
|    4|   72.521|
|    5|   71.853|
|    6|   72.122|
|    7|   74.005|
|    8|   72.595|
|    9|   71.763|
|   10|    71.08|
|   11|   71.546|
|   12|   72.445|
+-----+---------+



In [30]:
month_avgclose = spark.sql("SELECT MONTH(date)AS month,AVG(close)AS avg_close FROM walmart GROUP BY month ORDER BY month")

In [31]:
#Importing data to client[MySQL]

In [32]:
VH_ratio.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res1") \
                    .option("user", "root").option("password", "cloudera") \
                    .save()

In [33]:
peak_high_price.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res2") \
                    .option("user", "root").option("password", "cloudera") \
                    .save()

In [34]:
max_min_vol.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res3") \
                    .option("user", "root").option("password", "cloudera") \
                    .save()

In [35]:
min60_close_days.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res4") \
                    .option("user", "root").option("password", "cloudera") \
                    .save()

In [36]:
res5 = percentage.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res5") \
                    .option("user", "root").option("password", "cloudera") \
                    .save()

In [38]:
pearson_correlation.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res6") \
                    .option("user", "root").option("password", "cloudera") \

In [39]:
 year_maxhigh.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res7") \
                    .option("user","root") \
                    .option("password","Kaviya@2106") \
                    .save()

In [40]:
month_avgclose.write.format("jdbc") \
                    .option("driver", "com.mysql.cj.jdbc.Driver") \
                    .option("url", "jdbc:mysql://localhost:3306/walmart") \
                    .option("dbtable","res8") \
                    .option("user","root") \
                    .option("password","Kaviya@2106") \
                    .save()