In [1]:
import findspark
findspark.init()

import pyspark

In [2]:
sc=pyspark.SparkContext(appName='hw5')

In [3]:
sc

In [4]:
lines=sc.textFile("csds-material/csds-material/input/file1")
lines

csds-material/csds-material/input/file1 MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
lines.collect()

['Hello World Bye World']

In [6]:
words=lines.flatMap(lambda l:l.split(" "))

In [7]:
words.count()

4

### **Problem 1**

In [11]:
lines=sc.wholeTextFiles("csds-material/csds-material/input")
counts=lines.flatMap(lambda x:x[1].split(' ')) \
.map(lambda x: x.strip())\
.map(str) \
.map(lambda x: (x,1)) \
.reduceByKey(lambda a,b:a+b)
output=counts.collect()
for(word, count) in output:
    print("%s: %i" % (word, count))

Hello: 2
Goodbye: 1
World: 2
Bye: 1
Hadoop: 2


### **Problem 2**

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("hw5sql") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [12]:
def parse_line(line):
    fields=line.split(',')
    p0=fields[0]
    p1=fields[1]
    p2=fields[2]
    p3=float(fields[3])
    p4=fields[4]
    return (p0,p1,p2,p3,p4)

In [16]:
sc=spark.sparkContext
lines=sc.textFile("csds-material/csds-material/hive/purchases.txt")

# Each line is converted to a tuple
purchase=lines.map(parse_line)

In [18]:
field1=StructField('timestamp',StringType(),True)
field2=StructField('location',StringType(),True)
field3=StructField('category',StringType(),True)
field4=StructField('price',FloatType(),True)
field5=StructField('card',StringType(),True)

fields=[field1,field2,field3,field4,field5]
schema=StructType(fields)

schemaPurchase=spark.createDataFrame(purchase,schema)

In [19]:
schemaPurchase.createOrReplaceTempView("purchase")

#### **What is the average price of the products that were purchased via Mastercard?**

In [20]:
spark.sql("SELECT AVG(price) FROM purchase WHERE card='MasterCard'").show()

+-----------------+
|       avg(price)|
+-----------------+
|275.0677317417774|
+-----------------+



In [40]:
spark.sql("SELECT timestamp, sum(price) FROM \
(SELECT CAST(timestamp AS DATE), price FROM purchase) \
group by timestamp order by sum(price) desc limit 1").show()



+----------+-----------------+
| timestamp|       sum(price)|
+----------+-----------------+
|2012-03-17|2384.480026245117|
+----------+-----------------+



#### **What is the minimum value of a product under the Computers category?**

In [42]:
spark.sql("SELECT min(price) FROM purchase WHERE category='Computers'").show()

+----------+
|min(price)|
+----------+
|      0.38|
+----------+



#### **How many distinct categories of products are there?**

In [43]:
spark.sql("SELECT count(distinct(category)) as distinct_category_number FROM purchase").show()

+------------------------+
|distinct_category_number|
+------------------------+
|                      18|
+------------------------+



#### **Which store location had the lowest total sales?**

In [44]:
spark.sql("SELECT location as lowest_sale_location,sum(price) as total_price FROM purchase \
GROUP BY location ORDER BY total_price limit 1").show()

+--------------------+-----------------+
|lowest_sale_location|      total_price|
+--------------------+-----------------+
|               Plano|784.9599838256836|
+--------------------+-----------------+



### **Problem 3**

In [45]:
purchase.take(5)

[('2012-07-20 09:59:00', 'Corpus Christi', 'CDs', 327.91, 'Cash'),
 ('2012-03-11 17:29:00', 'Durham', 'Books', 115.09, 'Discover'),
 ('2012-07-31 11:43:00', 'Rochester', 'Toys', 332.07, 'MasterCard'),
 ('2012-06-18 14:47:00', 'Garland', 'Computers', 31.99, 'Visa'),
 ('2012-03-27 11:40:00', 'Tulsa', 'CDs', 452.18, 'Discover')]

In [46]:
purchaseDF=purchase.toDF(['timestamp','location','category','price','card'])

In [47]:
purchaseDF.show(5)

+-------------------+--------------+---------+------+----------+
|          timestamp|      location| category| price|      card|
+-------------------+--------------+---------+------+----------+
|2012-07-20 09:59:00|Corpus Christi|      CDs|327.91|      Cash|
|2012-03-11 17:29:00|        Durham|    Books|115.09|  Discover|
|2012-07-31 11:43:00|     Rochester|     Toys|332.07|MasterCard|
|2012-06-18 14:47:00|       Garland|Computers| 31.99|      Visa|
|2012-03-27 11:40:00|         Tulsa|      CDs|452.18|  Discover|
+-------------------+--------------+---------+------+----------+
only showing top 5 rows



In [48]:
from pyspark.sql.functions import *

#### **What is the average price of the products that were purchased via Mastercard?**

In [49]:
purchaseDF.filter(purchaseDF.card=='MasterCard').agg(avg(col('price'))).show()

+-----------------+
|       avg(price)|
+-----------------+
|275.0677319587629|
+-----------------+



#### **Which date recorded the highest total sales?**

In [50]:
purchaseDF.withColumn('sales_date',purchaseDF['timestamp'].cast(TimestampType()).cast(DateType()))\
.select('sales_date','price').groupBy('sales_date').agg({'price': 'sum'}).orderBy(desc('SUM(price)')).first()

Row(sales_date=datetime.date(2012, 3, 17), sum(price)=2384.48)

#### **What is the minimum value of a product under the Computers category?**

In [51]:
purchaseDF.filter(purchaseDF.category=='Computers').agg(min(col('price'))).show()

+----------+
|min(price)|
+----------+
|      0.38|
+----------+



#### **How many distinct categories of products are there?**

In [52]:
purchaseDF.select('category').distinct().count()

18

#### **Which store location had the lowest total sales?**

In [53]:
purchaseDF.select('location','price').groupBy('location').agg({'price': 'sum'}).orderBy('SUM(price)').first()

Row(location='Plano', sum(price)=784.96)