In [1]:
# Part - one
readFile = sc.wholeTextFiles ('input/*')
def lambdaFunc(input):
    input = input.replace('\n','')
    input = input.split(" ")
    return input
output = readFile.flatMap(lambda f: lambdaFunc(f[1])).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
output.saveAsTextFile('output/output1')

In [2]:
# Part - two
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
lines = sc.textFile('hive/purchases.txt')
parts = lines.map(lambda l: l.split(","))
fields = parts.map(lambda p: Row(timestamp=p[0], location=p[1],category=p[2],price=float(p[3]),card=p[4]))
schemaPurchase = spark.createDataFrame(fields)
schemaPurchase.createOrReplaceTempView("purchase")

In [3]:
# What is the average price of the products that were purchased via Mastercard?
avgmsprice = spark.sql("SELECT AVG(price) as avgprice FROM purchase WHERE card='MasterCard'")
avgmsprice.collect()

[Row(avgprice=275.0677319587629)]

In [4]:
# Which date recorded the highest total sales?
highestsale = spark.sql("SELECT DATE_FORMAT(timestamp, 'yyyy-MM-dd') as t, SUM(price) as totalPrice FROM purchase group by t order by totalPrice desc limit 1")
highestsale.collect()

[Row(t='2012-03-17', totalPrice=2384.48)]

In [5]:
# What is the minimum value of a product under the Computers category?
minComputer = spark.sql("SELECT price FROM purchase WHERE category='Computers' order by price asc limit 1")
minComputer.collect()

[Row(price=0.38)]

In [6]:
# How many distinct categories of products are there?
categories = spark.sql("SELECT DISTINCT(category) FROM purchase")
categories.collect()

[Row(category="Children's Clothing"),
 Row(category='Sporting Goods'),
 Row(category='CDs'),
 Row(category='Computers'),
 Row(category='Consumer Electronics'),
 Row(category='Health and Beauty'),
 Row(category='Pet Supplies'),
 Row(category='DVDs'),
 Row(category='Baby'),
 Row(category='Crafts'),
 Row(category="Women's Clothing"),
 Row(category='Video Games'),
 Row(category='Books'),
 Row(category='Music'),
 Row(category="Men's Clothing"),
 Row(category='Garden'),
 Row(category='Cameras'),
 Row(category='Toys')]

In [6]:
# Which store location had the lowest total sales?
locations = spark.sql("SELECT location, sum(price) as totalPrice FROM purchase group by location order by totalPrice asc limit 1")
locations.collect()

[Row(location='Plano', totalPrice=784.96)]

In [7]:
# Part - three
purchaseFile = sc.textFile('hive/purchases.txt')
purchases = purchaseFile.map(lambda lines:tuple(lines.split(",")))
purchases.take(5)
purchasesDF = purchases.toDF(['timestamp', 'location','category','price','card'])
purchasesDF.show(5)
from pyspark.sql.functions import *

+-------------------+--------------+---------+------+----------+
|          timestamp|      location| category| price|      card|
+-------------------+--------------+---------+------+----------+
|2012-07-20 09:59:00|Corpus Christi|      CDs|327.91|      Cash|
|2012-03-11 17:29:00|        Durham|    Books|115.09|  Discover|
|2012-07-31 11:43:00|     Rochester|     Toys|332.07|MasterCard|
|2012-06-18 14:47:00|       Garland|Computers| 31.99|      Visa|
|2012-03-27 11:40:00|         Tulsa|      CDs|452.18|  Discover|
+-------------------+--------------+---------+------+----------+
only showing top 5 rows



In [8]:
# What is the average price of the products that were purchased via Mastercard?
purchasesDF.filter(purchasesDF.card == 'MasterCard').agg(avg(col('price'))).show()

+-----------------+
|       avg(price)|
+-----------------+
|275.0677319587629|
+-----------------+



In [9]:
# Which date recorded the highest total sales?
purchasesDF.select(date_format(col("timestamp"),"yyyy-MM-dd").alias("date"), col("price")).groupBy(col("date")).agg(sum(col("price"))).sort(col("sum(price)").desc()).show(1)

+----------+----------+
|      date|sum(price)|
+----------+----------+
|2012-03-17|   2384.48|
+----------+----------+
only showing top 1 row



In [10]:
# What is the minimum value of a product under the Computers category?
purchasesDF.filter(purchasesDF.category == 'Computers').sort(col("price")).show(1)

+-------------------+---------+---------+-----+----+
|          timestamp| location| category|price|card|
+-------------------+---------+---------+-----+----+
|2012-05-30 13:26:00|St. Louis|Computers| 0.38|Amex|
+-------------------+---------+---------+-----+----+
only showing top 1 row



In [11]:
# How many distinct categories of products are there?
purchasesDF.select(purchasesDF.category).distinct().show()

+--------------------+
|            category|
+--------------------+
| Children's Clothing|
|      Sporting Goods|
|                 CDs|
|           Computers|
|Consumer Electronics|
|   Health and Beauty|
|        Pet Supplies|
|                DVDs|
|                Baby|
|              Crafts|
|    Women's Clothing|
|         Video Games|
|               Books|
|               Music|
|      Men's Clothing|
|              Garden|
|             Cameras|
|                Toys|
+--------------------+



In [12]:
# Which store location had the lowest total sales?
purchasesDF.groupBy("location").agg(sum(col("price"))).sort(col("sum(price)")).show(1)

+--------+----------+
|location|sum(price)|
+--------+----------+
|   Plano|    784.96|
+--------+----------+
only showing top 1 row

