## Problem 1 (30%)
Consider two attached text files: *bible.txt* and *4300.txt*. The first contains ASCII text of King James Bible and the other the text of James Joyceâs novel Ulysses.

### i) Download stop words
Download and parse a list of *stop words* from the web page: http://www.lextek.com/manuals/onix/stopwords1.html.

In [None]:
# Load libraries
import requests
import csv
from bs4 import BeautifulSoup

# Download page
page = requests.get("http://www.lextek.com/manuals/onix/stopwords1.html")

# Parse page
html = BeautifulSoup(page.content, 'html.parser').pre
text = html.get_text().split()

# Remove introduction
stopwords = text[21:len(text)]

## Export data to a datafile
result_file = open("stopwords.csv", 'w')
for i in stopwords:
  result_file.write(i + "\n")

result_file.close

### ii) RDD word number pairs
Use Spark transformation and action functions present in *RDD API* to transform those texts into RDD-s that contain words and numbers of occurrence of those words in respective text. From King James Bible eliminate all verse numbers of the form: *03:019:024*. Eliminate from both RDDs so called *stop words*. List for us 30 most frequent words in each RDD (text).

#### Cleanup function

In [3]:
# Cleanup function
def clean_up(rdd_words):
  import re # Import regex library
  rdd_words_clean1 = re.sub(r'(03:019:024)', '', rdd_words) # certain verse
  rdd_words_clean2 = re.sub(r'([^A-Za-z0-9\s+])', '', rdd_words_clean1) # Nonwords  
  rdd_words_split = rdd_words_clean2.split(' ') # Split data
  return [word.lower() for word in rdd_words_split if word != ''] # Lower case

#### Startup RDD Session

In [4]:
# Import libraries
import findspark
findspark.init("/usr/local/spark")
from pyspark import SparkContext, SparkConf

# Start session
conf = SparkConf().setMaster("local").setAppName("rdd")
sc = SparkContext(conf = conf)

#### Load data into RDD and cleanup

In [30]:
# Read data
rdd_ulysses = sc.textFile("4300-2.txt")
rdd_bible = sc.textFile("bible.txt")
rdd_stopwords = sc.textFile("stopwords.csv")

# Clean data and remove stopwords and verse number
rdd_ulysess = rdd_ulysses.flatMap(clean_up)
rdd_ulysess_cleaned = rdd_ulysess.subtract(rdd_stopwords)

rdd_bible = rdd_bible.flatMap(clean_up)
rdd_bible_cleaned = rdd_bible.subtract(rdd_stopwords)

# Number of occurence (Mapreduce)
rdd_ulysess_all = rdd_ulysess_cleaned.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)
rdd_bible_all = rdd_bible_cleaned.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)

#### Print Top 30 Word pairs

In [35]:
print("Bible - Top 30 word pairs:")
print(rdd_bible_all.take(30))

print("Ulysess - Top 30 word pairs:")
print(rdd_ulysess_all.take(30))

Bible - Top 30 word pairs:
[('unto', 8997), ('lord', 7830), ('thou', 5474), ('thy', 4600), ('god', 4443), ('ye', 3982), ('thee', 3826), ('israel', 2565), ('son', 2370), ('king', 2270), ('hath', 2264), ('people', 2145), ('house', 2024), ('children', 1802), ('day', 1734), ('land', 1718), ('shalt', 1616), ('hand', 1466), ('saying', 1445), ('behold', 1326), ('saith', 1262), ('sons', 1116), ('hast', 1070), ('david', 1015), ('earth', 987), ('jesus', 983), ('father', 979), ('thine', 938), ('name', 930), ('thereof', 906)]
Ulysess - Top 30 word pairs:
[('bloom', 2798), ('stephen', 1511), ('time', 1146), ('yes', 1082), ('eyes', 987), ('hand', 918), ('street', 879), ('little', 870), ('father', 831), ('day', 753), ('round', 717), ('night', 696), ('head', 666), ('sir', 657), ('dont', 656), ('god', 654), ('name', 651), ('im', 606), ('look', 594), ('life', 583), ('hes', 582), ('john', 582), ('thats', 576), ('poor', 558), ('woman', 558), ('tell', 532), ('voice', 531), ('ill', 522), ('dedalus', 522), (

### iii) Get unique words
Create RDD-s that contain only words unique for each of text.

In [36]:
# Get distinct values
rdd_ulysess_dist = rdd_ulysess_cleaned.distinct()
rdd_bible_dist = rdd_bible_cleaned.distinct()

# Number of unique words
rdd_ulysess_dist.count()
rdd_bible_dist.count()


43956

### iv) Get common words
Finally create an RDD that contains only the words common to both texts. In latest RDD preserve numbers of occurrences in two texts. In other words a row in your RDD will look like (love 45 32). Print or store the words and the numbers of occurrences.

In [39]:
rdd_combined = rdd_ulysess_all.join(rdd_bible_all)

print("Common Words:")
print(rdd_combined.sortByKey(False).take(10))

Common Words:
[('zion', (15, 152)), ('zealous', (6, 8)), ('zeal', (9, 16)), ('youths', (9, 2)), ('youthful', (18, 1)), ('youth', (93, 70)), ('yourselves', (6, 191)), ('yonder', (6, 7)), ('yokefellow', (3, 1)), ('yoke', (15, 59))]


### v) 20 most frequent words
Create for us the list of 20 most frequently used words common to both texts. In your report, print (store) the words, followed by the number of occurrences in Ulysses and then the Bible. Order your report in descending order starting by the number of occurrences in Ulysses. Present the same data this time ordered by the number of occurrences in the Bible.

In [41]:
print("Top 20 word pairs (Ulysess):")
rdd_combined = rdd_ulysess_all.join(rdd_bible_all)
print(rdd_combined.sortBy(lambda a:a[1], False).take(5))

print("Top 20 word pairs (Bible):")
rdd_combined = rdd_bible_all.join(rdd_ulysess_all)
print(rdd_combined.sortBy(lambda a:a[1], False).take(5))

Top 20 word pairs (Ulysess):
[('stephen', (1511, 7)), ('time', (1146, 623)), ('yes', (1082, 4)), ('eyes', (987, 503)), ('hand', (918, 1466))]
Top 20 word pairs (Bible):
[('unto', (8997, 15)), ('lord', (7830, 447)), ('thou', (5474, 161)), ('thy', (4600, 141)), ('god', (4443, 654))]


### vi) Get a random sample
List for us a random samples containing 5% of words in the final RDD.

In [45]:
rdd_5perc = format(rdd_combined.takeSample(False, int(rdd_combined.count() * 5/100), seed=123))
print("5 percent sample of common words in both books")
print(rdd_5perc)

5 percent sample of common words in both books
[('warrior', (1, 6)), ('fro', (25, 60)), ('lapwing', (2, 15)), ('prophesying', (6, 3)), ('desperate', (2, 3)), ('kinsmen', (7, 3)), ('solicitation', (2, 3)), ('breach', (26, 21)), ('acceptance', (1, 3)), ('food', (55, 78)), ('rolls', (1, 21)), ('pence', (5, 21)), ('disk', (2, 26)), ('anchors', (3, 3)), ('breathed', (4, 18)), ('suppose', (10, 342)), ('applicable', (6, 3)), ('cursed', (72, 24)), ('liked', (1, 93)), ('youths', (2, 9)), ('laying', (13, 21)), ('courage', (20, 24)), ('couple', (10, 36)), ('security', (1, 12)), ('reared', (10, 9)), ('genealogy', (15, 3)), ('issue', (40, 39)), ('97', (2, 3)), ('discomfited', (9, 3)), ('candles', (1, 24)), ('ought', (97, 186)), ('available', (4, 9)), ('virgins', (23, 51)), ('understand', (93, 110)), ('halting', (1, 3)), ('preferred', (5, 18)), ('dine', (3, 3)), ('unmerciful', (1, 3)), ('dying', (6, 107)), ('treasure', (37, 18)), ('soon', (65, 96)), ('repent', (46, 12)), ('enlargement', (1, 3)), ('r

## Problem 2 (20%)
Implement problem 1 using DataFrame API.

### i) DF word number pairs
Use Spark transformation and action functions present in *DF API* to transform those texts into DF-s that contain words and numbers of occurrence of those words in respective text. From King James Bible eliminate all verse numbers of the form: *03:019:024*. Eliminate from both RDDs so called *stop words*. List for us 30 most frequent words in each DF (text).

#### Functions

In [46]:
# Function
from pyspark.sql.functions import regexp_replace, trim, col, lower

def removePunctuation(column):
  return trim(lower(regexp_replace(column,'([^A-Za-z0-9\s+])', ''))).alias('words')

# Cleanup function
def clean_up(rdd_words):
  import re # Import regex library
  rdd_words_clean1 = re.sub(r'(03:019:024)', '', rdd_words) # certain verse
  rdd_words_clean2 = re.sub(r'([^A-Za-z0-9\s+])', '', rdd_words_clean1) # Nonwords  
  rdd_words_split = rdd_words_clean2.split(' ') # Split data
  return [word.lower() for word in rdd_words_split if word != ''] # Lower case

#### Create Session

In [24]:
# Import libraries
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.sql.functions import split          # Function to split data
from pyspark.sql.functions import explode        # Equivalent to flatMap

# Create Session
spark = SparkSession.builder.master("local") \
                    .appName("df").getOrCreate()

#### Data

In [47]:
# Read data
df_ulysses = spark.read.text("4300-2.txt")
df_bible = spark.read.text("bible.txt")
df_stopwords = spark.read.text("stopwords.csv")

# Select words
df_ulysses_all = df_ulysses.select(split(df_ulysses.value, " ").alias("words"))
df_ulysses_all = df_ulysses_all.select(explode(df_ulysses_all.words).alias("words"))
df_ulysses_all = df_ulysses_all.select(removePunctuation(col('words')))
df_ulysses_all = df_ulysses_all.filter('words != Null or words != ""')

df_bible_all = df_bible.select(split(df_bible.value, " ").alias("words"))
df_bible_all = df_bible_all.select(explode(df_bible_all.words).alias("words"))
df_bible_all = df_bible_all.select(removePunctuation(col('words')))
df_bible_all = df_bible_all.filter('words != Null or words != ""')

# Remove stopwords
df_ulysses_cleaned = df_ulysses_all.join(df_stopwords, df_ulysses_all.words == df_stopwords.value, 'left_anti').select(df_ulysses_all.words)
df_bible_cleaned = df_bible_all.join(df_stopwords, df_bible_all.words == df_stopwords.value, 'left_anti').select(df_bible_all.words)

In [48]:
# Get frequent word pair
df_ulysses_unique = df_ulysses_cleaned.groupBy("words").count()
df_ulysses_unique = df_ulysses_unique.orderBy(["count"], ascending=False)
print("30 Most frequent words: ")
print(df_ulysses_unique.show(30))

30 Most frequent words: 
+-------+-----+
|  words|count|
+-------+-----+
|  bloom| 2798|
|stephen| 1511|
|   time| 1146|
|    yes| 1082|
|   eyes|  987|
|   hand|  918|
| street|  879|
| little|  870|
| father|  831|
|    day|  753|
|  round|  717|
|  night|  696|
|   head|  666|
|    sir|  657|
|   dont|  656|
|    god|  654|
|   name|  651|
|     im|  606|
|   look|  594|
|   life|  583|
|    hes|  582|
|   john|  582|
|  thats|  576|
|   poor|  558|
|  woman|  558|
|   tell|  532|
|  voice|  531|
|    ill|  522|
|dedalus|  522|
|  house|  511|
+-------+-----+
only showing top 30 rows

None


In [49]:
# Get frequent word pair
df_bible_unique = df_bible_cleaned.groupBy("words").count()
df_bible_unique = df_bible_unique.orderBy(["count"], ascending=False)
print("30 Most frequent words: ")
print(df_bible_unique.show(30))

30 Most frequent words: 
+--------+-----+
|   words|count|
+--------+-----+
|    unto| 8997|
|    lord| 7830|
|    thou| 5474|
|     thy| 4600|
|     god| 4443|
|      ye| 3982|
|    thee| 3826|
|  israel| 2565|
|     son| 2370|
|    king| 2270|
|    hath| 2264|
|  people| 2145|
|   house| 2024|
|children| 1802|
|     day| 1734|
|    land| 1718|
|   shalt| 1616|
|    hand| 1466|
|  saying| 1445|
|  behold| 1326|
|   saith| 1262|
|    sons| 1116|
|    hast| 1070|
|   david| 1015|
|   earth|  987|
|   jesus|  983|
|  father|  979|
|   thine|  938|
|    name|  930|
| thereof|  906|
+--------+-----+
only showing top 30 rows

None


### ii) Get unique words
Create DF-s that contain only words unique for each of text.


In [50]:
df_ulysess_dist = df_ulysses_all.distinct()
df_ulysess_dist.count()


30038

In [51]:
df_bible_dist = df_bible_all.distinct()
df_bible_dist.count()


44285

### iii) Get common words
Finally create an DF that contains only the words common to both texts. In latest DF preserve numbers of occurrences in two texts. In other words a row in your DF will look like (love 45 32). Print or store the words and the numbers of occurrences.


In [57]:
df_combined = df_ulysses_unique.join(df_bible_unique, df_ulysses_unique.words == df_bible_unique.words, 'inner')
df_combined.show(5)

+-------+-----+-------+-----+
|  words|count|  words|count|
+-------+-----+-------+-----+
|stephen| 1511|stephen|    7|
|   time| 1146|   time|  623|
|    yes| 1082|    yes|    4|
|   eyes|  987|   eyes|  503|
|   hand|  918|   hand| 1466|
+-------+-----+-------+-----+
only showing top 5 rows



### iv) 20 most frequent words
Create for us the list of 20 most frequently used words common to both texts. In your report, print (store) the words, followed by the number of occurrences in Ulysses and then the Bible. Order your report in descending order starting by the number of occurrences in Ulysses. Present the same data this time ordered by the number of occurrences in the Bible.


In [None]:
df_combined = df_combined.select(['word', 'bible_count']).orderBy(col('bible_count').desc())


In [58]:
print(df_combined.count())

5771


In [None]:
print "In your report, print (store) the words, followed by the number of occurrences in Ulysses and then the Bible."
print(bible_combined_df.show(20))
print(bible_combined_df.agg(sum('bible_count').alias('sum_bible_count')).show())

### v) Get a random sample
List for us a random samples containing 5% of words in the final DF.

In [8]:
# Order your report in descending order starting by the number of occurrences in Ulysses.
print "Order your report in descending order starting by the number of occurrences in Ulysses."
ulysses_combined_df = combined_df.select(['ulysses_word', 'ulysses_count']).orderBy(col('ulysses_count').desc())
print(ulysses_combined_df.show(20))
print(ulysses_combined_df.agg(sum('ulysses_count').alias('sum_ulysses_count')).show())

# List for us a random samples containing 5% of words in the final RDD.
print "List for us a random samples containing 5% of words in the final RDD."
final_df_sample = bible_combined_df.sample(False, 0.5, 13)
print(final_df_sample.show())
print(final_df_sample.count())


## Problem 3 (30%)
Consider attached files *transactions.txt* and *products.txt*.

### i) Load data
Each line in *transactions.txt* file contains a *transaction date*, *time*, *customer id*, *product id*, *quantity bought* and *price paid*, delimited with hash (#) sign. Each line in file *products.txt* contains *product id*, *product name*, *unit price* and *quantity available* in the store. Bring those data in Spark and organize it as DataFrames with named columns.


In [9]:
# Read data
df_transactions = spark.read.csv("transactions.txt", sep="#")
df_products = spark.read.csv("products.txt", sep="#")

In [10]:
df_transactions = df_transactions.withColumnRenamed('_c0', "transaction_date")
df_transactions = df_transactions.withColumnRenamed('_c1', "time")
df_transactions = df_transactions.withColumnRenamed('_c2', "customer_id")
df_transactions = df_transactions.withColumnRenamed('_c3', "product_id")
df_transactions = df_transactions.withColumnRenamed('_c4', "quantity_bought")
df_transactions = df_transactions.withColumnRenamed('_c5', "price_paid")

df_products = df_products.withColumnRenamed('_c0', "product_id")
df_products = df_products.withColumnRenamed('_c1', "product_name")
df_products = df_products.withColumnRenamed('_c2', "unit_price")
df_products = df_products.withColumnRenamed('_c3', "quantity")

### ii) Largest spending
Using either DataFrame methods or plain SQL statements find 5 customers with the largest spent on the day. Find the names of the products each of those 5 customers bought.

In [11]:
#df_transactions.groupBy("customer id").sum().show()
df_cust_spend=df_transactions.groupBy('customer_id', 'transaction_date').agg({'price_paid': 'sum'})
df_cust_spend = df_cust_spend.orderBy('sum(price_paid)', ascending=False)


In [12]:
# Create tables
df_cust_spend.createOrReplaceTempView("tbl_cust_spend")
df_transactions.createOrReplaceTempView("tbl_transactions")
df_products.createOrReplaceTempView("tbl_products")

In [13]:
df_top5 = spark.sql("SELECT * FROM tbl_cust_spend LIMIT 5")
df_top5.show()

+-----------+----------------+------------------+
|customer_id|transaction_date|   sum(price_paid)|
+-----------+----------------+------------------+
|         76|      2015-03-30|100049.00000000001|
|         53|      2015-03-30| 88829.76000000001|
|         56|      2015-03-30|          85906.94|
|         51|      2015-03-30|          83312.12|
|         31|      2015-03-30|          83202.61|
+-----------+----------------+------------------+



In [14]:
df_transactions.show(5)

+----------------+--------+-----------+----------+---------------+----------+
|transaction_date|    time|customer_id|product_id|quantity_bought|price_paid|
+----------------+--------+-----------+----------+---------------+----------+
|      2015-03-30| 6:55 AM|         51|        68|              1|   9506.21|
|      2015-03-30| 7:39 PM|         99|        86|              5|   4107.59|
|      2015-03-30|11:57 AM|         79|        58|              7|   2987.22|
|      2015-03-30|12:46 AM|         51|        50|              6|   7501.89|
|      2015-03-30|11:39 AM|         86|        24|              5|    8370.2|
+----------------+--------+-----------+----------+---------------+----------+
only showing top 5 rows



In [15]:
df_top5_products = df_transactions.join(df_top5, df_transactions.customer_id == df_top5.customer_id, "left").select(df_transactions.customer_id, df_transactions.product_id)
df_top5_list = df_top5_products.join(df_products, df_top5_products.product_id == df_products.product_id, "left").select(df_top5_products.customer_id, df_products.product_name)
df_top5_list.orderBy("customer_id").show()

+-----------+--------------------+
|customer_id|        product_name|
+-----------+--------------------+
|          1|SAMSUNG LED TV 42...|
|          1|ROBITUSSIN PEAK C...|
|          1|    LEGO Minifigures|
|          1|           Glipizide|
|          1|Scrub Care Povido...|
|          1|Medal Of Honor Al...|
|          1|Notebook Lenovo U...|
|          1|        LEGO Technic|
|          1|PC HP 490PD MT, D...|
|         10|              Ativan|
|         10|   LEGO Galaxy Squad|
|         10|SAMSUNG LED TV 32...|
|         10|          Dictionary|
|         10|ROBITUSSIN PEAK C...|
|         10|Procesor Intel Co...|
|         10|GAM X360 Hitman A...|
|        100|    chest congestion|
|        100|PC HP 490PD MT, D...|
|        100|     LEGO The Hobbit|
|        100|Roller Derby Roll...|
+-----------+--------------------+
only showing top 20 rows



### iii) Total number sold
Find the names and total number sold of 10 most popular products. Order products once per the number sold and then by the total value (quanity*price) sold. 

In [16]:
# List the sum of sold products
df_sum_products=df_transactions.groupBy('product_id').agg({'quantity_bought': 'sum'})
df_sum_products = df_sum_products.orderBy('sum(quantity_bought)', ascending=False)

# Get top ten results
df_sum_products.createOrReplaceTempView("tbl_sum_products")
df_top10_products = spark.sql("SELECT * FROM tbl_sum_products LIMIT 10")

# Calculate the total value
df_products_distinct = df_products.select(df_products.product_id, df_products.product_name, df_products.unit_price).distinct()
df_top10_products = df_top10_products.join(df_products_distinct, df_top10_products.product_id == df_products_distinct.product_id, "left")
df_top10_products = df_top10_products.select(df_top10_products['product_name'], df_top10_products['sum(quantity_bought)'].alias("quantity"), df_top10_products['unit_price'], (df_top10_products['sum(quantity_bought)'] * df_top10_products['unit_price']).alias("Total value"))
df_top10_products.show()

+--------------------+--------+----------+------------------+
|        product_name|quantity|unit_price|       Total value|
+--------------------+--------+----------+------------------+
|Notebook Lenovo U...|   226.0|    461.08|         104204.08|
|SAMSUNG LED TV 39...|   142.0|   2531.15|          359423.3|
|               Jafra|   102.0|   3715.07|         378937.14|
|            Jantoven|   102.0|    3255.4|          332050.8|
|Far Cry 4 Limited...|   101.0|    711.88|          71899.88|
|Roller Derby Roll...|    91.0|   7783.79|         708324.89|
|Procesor Intel Co...|    90.0|   4570.99|          411389.1|
|  Sony Playstation 3|    88.0|   5088.35|447774.80000000005|
|    chest congestion|    84.0|   1305.04|         109623.36|
|Barbie Beach Ken ...|    82.0|    742.84|60912.880000000005|
+--------------------+--------+----------+------------------+



## Problem 4 (20%)
Implement problem 3 using RDD APIs.

### i) Load data
Each line in *transactions.txt* file contains a *transaction date*, *time*, *customer id*, *product id*, *quantity bought* and *price paid*, delimited with hash (#) sign. Each line in file *products.txt* contains *product id*, *product name*, *unit price* and *quantity available* in the store. Bring those data in Spark and organize it as DataFrames with named columns.

In [26]:
from pyspark.sql import SQLContext, Row
rdd_transactions = sc.textFile("transactions.txt")
rdd_transactions = rdd_transactions.map(lambda x: x.split("#"))
rdd_transactions = rdd_transactions.map(lambda x: Row(transaction_date = x[0], time = x[1], customer_id = int(x[2]), product_id = int(x[3]), quantity_bought = int(x[4]), price_paid = float(x[5])))

rdd_products = sc.textFile("products.txt")
rdd_products = rdd_products.map(lambda x: x.split("#"))
rdd_products = rdd_products.map(lambda x: Row(product_id = int(x[0]), product_name = x[1], unit_price = float(x[2]), quantity = int(x[3])))

### ii) Largest spending
Using either DataFrame methods or plain SQL statements find 5 customers with the largest spent on the day. Find the names of the products each of those 5 customers bought.

In [27]:
# Import data types
from pyspark.sql.types import *

# The schema is encoded in a string.
schemaString = "transaction_date time customer_id product_id quantity_bought price_paid"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Create schema
sch_transactions = spark.createDataFrame(rdd_transactions, schema)

# Creates a temporary view using the DataFrame
sch_transactions.createOrReplaceTempView("tbl_transactions")

In [28]:
rdd_transactions.take(5)

[Row(customer_id=51, price_paid=9506.21, product_id=68, quantity_bought=1, time='6:55 AM', transaction_date='2015-03-30'),
 Row(customer_id=99, price_paid=4107.59, product_id=86, quantity_bought=5, time='7:39 PM', transaction_date='2015-03-30'),
 Row(customer_id=79, price_paid=2987.22, product_id=58, quantity_bought=7, time='11:57 AM', transaction_date='2015-03-30'),
 Row(customer_id=51, price_paid=7501.89, product_id=50, quantity_bought=6, time='12:46 AM', transaction_date='2015-03-30'),
 Row(customer_id=86, price_paid=8370.2, product_id=24, quantity_bought=5, time='11:39 AM', transaction_date='2015-03-30')]

In [29]:

transactions_rdd = sc.textFile("file:////Users/swaite/Stirling/CSIE-63/assignment-4/data/inputs/transactions.txt") \
                     .map(lambda x: x.split("#"))
transactions_rdd = transactions_rdd.map(lambda x:
                                        Row(
                                            transaction_date=str(x[0]),
                                            time=str(x[1]),
                                            customer_id=int(x[2]),
                                            product_id=int(x[3]),
                                            quantity_bought=int(x[4]),
                                            price_paid=float(x[5])
                                        ))
transactions_df = spark.createDataFrame(transactions_rdd)
print(transactions_df.show(10))


# Each line in file products.txt contains:
#       product id,
#       product name,
#       unit price,
#       quantity
# available in the store.
# Bring those data in Spark and organize it as DataFrames with named columns.

products_rdd = sc.textFile("file:////Users/swaite/Stirling/CSIE-63/assignment-4/data/inputs/products.txt")\
                 .map(lambda x: x.split("#"))
products_rdd = products_rdd.map(lambda x:
                                Row(
                                    product_id=str(x[0]),
                                    product_name=str(x[1]),
                                    unit_price=float(x[2]),
                                    quantity=float(x[3])
                                ))
products_df = spark.createDataFrame(products_rdd)
print(products_df.show(10))

# Using either DataFrame methods or plain SQL statements find 5 customers with the largest spent on the day.
transactions_df.createOrReplaceTempView("transactions")
products_df.createOrReplaceTempView("products")
blah = spark.sql(
                    """
                        SELECT
                        customer_id,
                        SUM(to_float(quantity_bought) * to_float(price_paid)) AS net_rev
                        FROM transactions
                        GROUP BY customer_id
                        ORDER BY net_rev
                    """)

# GROUP BY customer_id
# ORDER BY 2 DESC

print(blah.show())

SyntaxError: invalid syntax (<ipython-input-29-666516511370>, line 1)

### iii) Total number sold
Find the names and total number sold of 10 most popular products. Order products once per the number sold and then by the total value (quanity*price) sold. 