### Practice 2 - Question 1

`Instructions`
- Get all customers who have placed order of amount more than 200.
- Input files are tab delimeted files placed at below HDFS location:
	- /user/cloudera/practice2/problem3/customers
	- /user/cloudera/practice2/problem3/orders
	- /user/cloudera/practice2/problem3/order_items


`Output Requirement`
- Output should be placed in below HDFS Location /user/cloudera/practice2/problem3/joinResults
- Output file should be comma seperated file with customer_fname,customer_lname,customer_city,order_amount.
- Below header should be added to the output: fname, lname,city,price


In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table orders --fields-terminated-by "\t" --target-dir /user/cloudera/practice2/problem3/orders
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table order_items --fields-terminated-by "\t" --target-dir /user/cloudera/practice2/problem3/order_items
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table customers --fields-terminated-by "\t" --target-dir /user/cloudera/practice2/problem3/customers
#load dataset
orders = spark.read.option("sep","\t").format("csv").load("/user/cloudera/practice2/problem3/orders")
orderItems = spark.read.option("sep","\t").format("csv").load("/user/cloudera/practice2/problem3/order_items")
customers = spark.read.option("sep","\t").format("csv").load("/user/cloudera/practice2/problem3/customers")
#rename column names
orders = orders.selectExpr("_c0 as order_id", "_c2 as customer_id")
orderItems = orderItems.selectExpr("_c1 as order_id", "_c4 as order_item_subtotal")
customers = customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname", "_c2 as customer_lname", "_c6 as customer_city")
#run spark sql queries
orderItems.createOrReplaceTempView("orderItems")
orderItemsFiltered = spark.sql("select order_id, sum(order_item_subtotal) as order_amount from orderItems group by order_id having sum(order_item_subtotal)>200")
#join dataframes
o_oi_join = orders.join(orderItemsFiltered, "order_id")
result = o_oi_join.join(customers, "customer_id")
final = result.selectExpr("customer_fname as fname", "customer_lname as lname", "customer_city as city", "order_amount as price")
#save to HDFS
final.write.option("sep",",").format("csv").save("/user/cloudera/practice2/problem3/joinResults")

### Practice 2 - Question 2

`Instructions`
- Get Customers from metastore table named "customers_hive" whose fname is like "Rich" and save the results in HDFS.


`Output Requirement`
- Result should be saved in /user/cloudera/practice2/problem4/customers/output.
- Output should contain only fname, lname and city
- Output should be saved in text format.
- Output should be sorted by customer_city
- fname and lname should seperated by tab with city seperated by colon


In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --username root --password admin --table customers --warehouse-dir /user/cloudera/problem3/customers_hive/input --hive-import --create-hive-table --hive-database default --hive-table customers_hive
#run spark sql queries
res = spark.sql("select concat(customer_fname,"\t",customer_lname,":",customer_city) from customers_hive where customer_fname like "Rich%" order by customer_city")
#save to HDFS
res.write.format("text").save("/user/cloudera/practice2/problem4/customers/output")

### Practice 2 - Question 3

`Instructions`
- Provided pipe delimited file, get total numbers customers in each state whose first name starts with 'M'   and  save results in HDFS.
- Input folder /user/cloudera/problem2/customer/tab

`Output Requirement`
- Result should be saved in a hive table "customer_m"
- File format should be parquet file with gzip compression.
- Output should have state name followed by total number of customers in that state.

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table customers  --target-dir /user/cloudera/problem2/customer/tab --fields-terminated-by "|" --columns "customer_id,customer_fname,customer_state"
#load dataset
customers = spark.read.option("sep","|").format("csv").load("/user/cloudera/problem2/customer/tab")
#rename columns
customers = customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname", "_c2 as customer_state")
#filter data
customersFiltered = customers.filter(customers.customer_fname.like("M%"))
#run spark sql queries
customersFiltered.createOrReplaceTempView("customers")
res = spark.sql("select customer_state, count(customer_fname) as count from customers group by customer_state")
#save to Hive
res.write.mode("overwrite").option("compression","gzip").option("fileFormat","parquet").format("hive").saveAsTable("customer_m") #returns error when column name includes any aggregating function 

### Practice 2 - Question 4

`Instructions`
- Provided  a meta-store table named product_ranked consisting of product details ,find the most expensive product in each category.


`Output Requirement`
- Output should have product_category_id ,product_name,product_price,rank.
- Result should be saved in /user/cloudera/pratice4/output/  as pipe delimited text file

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --username root --password admin --table products --warehouse-dir /user/cloudera/practice4.db --hive-import --create-hive-table --hive-database default --hive-table product_ranked -m 1
spark.sql("select * from product_ranked limit 10").show()
#run spark sql queries
product_ranked_all = spark.sql("select product_category_id, product_name, product_price, RANK() over (partition by pr.product_category_id order by pr.product_price desc) as rank from product_ranked pr")
#convert into rdd and use map function to add pipeline delimiters
product_ranked_first.rdd.map(lambda line: "|".join([str(x) for x in line])) 
#save to HDFS (cannot use write as it is not a DataFrame)
product_ranked_first.saveAsTextFile("/user/cloudera/practice4/output") 

### Practice 2 - Question5

`Instructions`
- Fetch all pending orders from  data-files stored at hdfs location /user/cloudera/problem3/parquet and save it  into json file  in HDFS


`Output Requirement`
- Result should be saved in /user/cloudera/problem3/orders_pending
- Output file should be saved as json file.
- Output file should Gzip compressed.

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table orders --as-parquetfile --target-dir /user/cloudera/problem3/parquet
#load as dataframe
orders = spark.read.format("parquet").load("/user/cloudera/problem3/parquet")
#filter pending orders
ordersFiltered = orders.filter(orders.order_status == 'PENDING')
#save to HDFS 
ordersFiltered.write.option("compression","gzip").format("json").save("/user/cloudera/problem3/orders_pending")

### Practice 2 - Question6

`Instructions`
- provided tab delimited file at hdfs location /user/cloudera/problem3/all/customer/input 
- save only first 4 field in the result as pipe delimited file in HDFS


`Output Requirement`
- Result should be saved in /user/cloudera/problem3/all/customer/output
- Output file should be saved in text format.

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table customers --fields-terminated-by '\t'   --target-dir /user/cloudera/problem3/all/customer/input
#load dataframe
customers = spark.read.option("sep","\t").format("csv").load("/user/cloudera/problem3/all/customer/input")
#filter first four fields
customersRenamed = customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname", "_c2 as customer_lname", "_c3 as customer_email")
#convert into rdd and use map function to add pipeline delimiters
customersRDD = customersRenamed.rdd.map(lambda line: "|".join([str(x) for x in line]))
#save to HDFS (cannot use write as it is not a DataFrame)
customersRDD.saveAsTextFile("/user/cloudera/problem3/all/customer/output")

### Practice 2 - Question7

`Instructions`
- Find top 10 products which has made highest revenue. 
- Products and order_items data are placed in HDFS directory /user/cloudera/practice4_ques6/order_items/ and /user/cloudera/practice4_ques6/products/  respectively. 


`Output Requirement`
- Output should have product_id and revenue seperated with ':'  and should be saved in /user/cloudera/practice4_ques6/output

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --username root --password admin --table order_items --target-dir /user/cloudera/practice4_ques6/order_items
sqoop import --connect "jdbc:mysql://localhost/retail_db" --username root --password admin --table products --target-dir /user/cloudera/practice4_ques6/products/ -m 1
#load dataframes
orderItems = spark.read.format("csv").load("/user/cloudera/practice4_ques6/order_items")
products = spark.read.format("csv").load("/user/cloudera/practice4_ques6/products")
#rename columns
orderItemsRenamed = orderItems.selectExpr("_c2 as product_id", "_c4 as order_item_subtotal")
productsRenamed = products.selectExpr("_c0 as product_id")
#join two dataframes
join = productsRenamed.join(orderItemsRenamed, "product_id")
#run sql queries
join.createOrReplaceTempView("oi_p")
res = spark.sql("select product_id, sum(order_item_subtotal) as product_revenue from oi_p group by product_id order by product_revenue desc limit 10")
#add colon delmiters
final = res.rdd.map(lambda line: ":".join([str(x) for x in line]))
#save to HDFS
final.saveAsTextFile("/user/cloudera/practice4_ques6/output")

### Practice 2 - Question8

`Instructions`
- Find all customers that lives 'Brownsville' city and save the result into HDFS.
- Input folder is  /user/cloudera/problem6/customer/text


`Output Requirement`
- Result should be saved in /user/cloudera/problem6/customer_Brownsville Output file should be saved in Json format

In [None]:
#load data into HDFS
sqoop import --connect "jdbc:mysql://localhost/retail_db" --password admin --username root --table customers --fields-terminated-by '\t' --columns "customer_id,customer_fname,customer_city"  --target-dir /user/cloudera/problem6/customer/text
#load dataframe
customers = spark.read.option("sep","\t").format("csv").load("/user/cloudera/problem6/customer/text")
#rename columns
customersRenamed = customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname","_c2 as customer_city")
#filter city where is Brownsville
customersFiltered = customersRenamed.filter(customersRenamed.customer_city == "Brownsville")
#save to HDFS
customersFiltered.write.mode("overwrite").format("json").save("/user/cloudera/customer_Brownsville")
