DATA ENGINEER - PYTHON PYSPARK

This test consits of fifteen problems. You are required to write your code in cell below each problem and output the result in cell next to it 

INSTRUCTIONS:
    1. You are required to download and import five CSV files, one json file and one xml file
    2. You would need to understand business involved behind CRM database tables. This is important
    3. Code must be in Python3/PySpark3
    4. Either your code should output something or leave the comment "#solution code here" as it is. We shall use 'Run All' in notebook and it shouldn't result error
    5. Test the entire notebook before uploading to Google Form provided
    6. You can use any Python3 library (two are imported already) or PySpark3 library. There is no restriction
    7. Output fieldname to be displayed are marked as single quaotes '' in problem statement. You should use same field alias names whereever required
    8. Notation for dataframe and/or array must be local to a problem's solution. Eg. Dataframe "test" for problem 8 should be df_prb8_test

In [6]:
import numpy as np
import pandas as pd
import os

#### imports for pyspark

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [8]:
# creating sparkcontext
spark = SparkSession.builder.master("local[*]").appName("assignmet").getOrCreate()

21/07/21 23:02:31 WARN Utils: Your hostname, AS-MAC-0048.local resolves to a loopback address: 127.0.0.1; using 192.168.50.161 instead (on interface en0)
21/07/21 23:02:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/07/21 23:02:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# import CSV here

In [9]:
# Reading all 5 csv files
df_accounts = spark.read.option("header", True).csv("./data/accounts.csv")
df_products = spark.read.option("header", True).csv("./data/products.csv")
df_sales_teams = spark.read.option("header", True).csv("./data/sales_teams.csv")
df_clicks = spark.read.option("header", True).csv("./data/clicks.csv")
df_sales_pipeline = spark.read.option("header", True).csv("./data/sales_pipeline.csv")

                                                                                

In [10]:
df_sales_pipeline.show()

+----------------+--------------+-----------------+-----------+------------+----------+-----------+----------+
|         account|opportunity_id|      sales_agent| deal_stage|     product|close_date|close_value|created_on|
+----------------+--------------+-----------------+-----------+------------+----------+-----------+----------+
|      Sunnamplex|      67HY0MW7|    Donn Cantrell|        Won|    GTXBasic|2017-05-06|      500.0|2017-04-24|
|            null|      MA82HVCI|   James Ascencio|In_Progress|      GTXPro|      null|       null|2017-06-15|
|            null|      BRL1KVVH|   Vicki Laflamme|       Lost|    GTXBasic|2017-08-03|        0.0|2017-05-19|
|           Silis|      R22O68FF|  Niesha Huffines|        Won|    GTXBasic|2017-06-27|      524.0|2017-03-21|
|           Silis|      J78AK31N|    Kami Bicknell|        Won|      MGRPFU|2017-08-04|     4794.0|2017-05-15|
|    Groovestreet|      8I9PRPGN|Versie Hillebrand|        Won|      MGRPFS|2017-05-27|       67.0|2017-04-16|
|

# import JSONs here

In [78]:
# Reading Json file
df_Orchestra = spark.read.json("./data/Orchestra.json", multiLine=True)

Refer & Use five CSVs to answer problem 1-10 below

### PROBLEM 1: Display 'Manager' and 'Grand Total Sales', for sales done by the sales agents reporting these managers

In [79]:
# join dataframe df_sales_teams and df_sales_pipeline on sales_agent and select "manager" and "close_value"columns.
# typecast close_value" from string to Double type
# Groupby the selected dataframe on "manager" with sum on "close_value"

df_sales_teams.join(df_sales_pipeline, on=['sales_agent'], how='inner').\
select("manager", "close_value").\
withColumn("close_value", df_sales_pipeline["close_value"].cast(DoubleType())).\
groupBy('manager').agg(sum('close_value').alias('Grand Total Sales')).show()

+----------------+-----------------+
|         manager|Grand Total Sales|
+----------------+-----------------+
|    Celia Rouche|        2518466.0|
|   Rocco Neubert|        3346813.0|
|   Summer Sewald|        2915362.0|
|   Melvin Marxen|        4265901.0|
|      Cara Losch|        1861751.0|
|Dustin Brinkmann|        3028635.0|
+----------------+-----------------+



### PROBLEM 2: Display 'Sales Agents' and 'Sales' for those sales where product sold at profit

In [80]:
# join 2 dataframes "df_sales_pipeline" and "df_products" on "product"
# select "sales_agent", "close_value", and "sales_price" where "close_value" > "sales_price"(profit)
# Rename "close_value" to "sales" and show 1st 20 records

df_sales_pipeline.join(df_products, on=['product'], how='inner').\
select("sales_agent", "close_value", "sales_price").\
where(col('close_value') > col('sales_price')).drop('sales_price').\
withColumnRenamed('close_value', 'sales').show()

+------------------+------+
|       sales_agent| sales|
+------------------+------+
|     Kami Bicknell|4794.0|
| Versie Hillebrand|  67.0|
|     Kami Bicknell|1480.0|
|    James Ascencio|9150.0|
| Marty Freudenburg|1380.0|
|      Reed Clapper| 660.0|
|      Reed Clapper|4750.0|
| Marty Freudenburg|  75.0|
|  Gladys Colclough|1524.0|
|      Reed Clapper|7007.0|
|    Wilburn Farren|1566.0|
|   Kary Hendrixson|1631.0|
|      Reed Clapper|6894.0|
|      Elease Gluck|4712.0|
|Jonathan Berthelot|1307.0|
|      Elease Gluck|4389.0|
|       Moses Frase|1509.0|
|   Darcel Schlecht| 875.0|
|   Niesha Huffines| 688.0|
|     Kami Bicknell|  66.0|
+------------------+------+
only showing top 20 rows



### PROBLEM 3: Display the 'Opportunity ID' and 'Days Taken to Close', for opportunities those got closed within a month

In [81]:
# Select "opportunity_id", "close_date" and "created_on" from df_deals_close
# typecast close_date" and "created_on" to Date from String
# Add a new column "Days Taken to Close" which stores the number of days taken to close the deal
# Filter the dataframe where days taken to close the deal exceeds 1 month.
# ASSUMPTIONS: 1 month = 31 days
# show 1st 20 records

df_deals_close = df_sales_pipeline.select("opportunity_id", "close_date", "created_on").\
withColumn("close_date", df_sales_pipeline["close_date"].cast(DateType())).\
withColumn("created_on", df_sales_pipeline["created_on"].cast(DateType()))

df_deals_close.withColumn("Days Taken to Close", datediff(df_deals_close.close_date, df_deals_close.created_on)).\
drop("close_date", "created_on").where(col("Days Taken to Close") < 31).show()


+--------------+-------------------+
|opportunity_id|Days Taken to Close|
+--------------+-------------------+
|      67HY0MW7|                 12|
|      4VHUTHOJ|                  1|
|      TMJ0OJ0B|                 10|
|      B22V5Z3B|                 13|
|      T8QRTV6F|                 10|
|      H7ZQUWDJ|                  9|
|      KJ1JOOQ0|                 13|
|      88MXDSGR|                  9|
|      4RE1ST7V|                 22|
|      4FJHFAH7|                  6|
|      H0NRZ2VX|                 22|
|      VB2E4FRU|                  8|
|      FLXHSKT4|                 13|
|      FJVFOQPG|                 12|
|      WHRDPR4H|                 12|
|      LWZPACKS|                  7|
|      377G0K33|                  2|
|      IF0LPAQA|                 15|
|      NEJZ68R1|                  5|
|      ZQLLEUES|                 13|
+--------------+-------------------+
only showing top 20 rows



### PROBLEM 4: Display product(s) got maximum leads (by count) generated from paid source

In [82]:
# Join df_sales_pipeline and df_clicks on column "created_on"
# select "source" and "product" 
# filter dataframe where "source" is of type "Paid"
# GropBy on product with count as agg
# sort the product in descending order to show maximum leads 1st

df_sales_pipeline.join(df_clicks, on=["created_on"], how="inner").\
select("source", "product").\
where(col("source") == "Paid").\
groupBy("product").\
count().\
sort(col("count").desc()).show()

+------------+-----+
|     product|count|
+------------+-----+
|    GTXBasic|71644|
|GTXPlusBasic|66316|
|      GTXPro|55036|
|      MGRPFS|53306|
|      MGRPFU|46500|
|  GTXPlusPro|37029|
|     GTK500U| 1409|
+------------+-----+



### PROBLEM 5: Display 'Sales Agent' and 'Opportunity Count', for those sales agents who lost atleast two opportunties

In [83]:
# Select "sales_agent" and "close_date" from "df_sales_pipeline" where "deal_stage" is of type "Lost"
# group By "sales_agent" with agg count on "sales_agent"
# rename count to "Opportunity Count" 
# Filter the dataframe where "Opportunity Count" is greater than 2
# show 1st 20 records

df_sales_pipeline.select("sales_agent", "close_date").\
where(col("deal_stage") == "Lost").groupBy("sales_agent").\
agg(count("sales_agent").alias("Opportunity Count")).\
where(col("Opportunity Count") >= 2).show()

+------------------+-----------------+
|       sales_agent|Opportunity Count|
+------------------+-----------------+
|   Darcel Schlecht|              337|
|     Kami Bicknell|              134|
|    Vicki Laflamme|              162|
|      Elease Gluck|               62|
|Jonathan Berthelot|              185|
|   Daniell Hammack|               80|
|     Anna Snelling|              140|
|      Cassey Cress|              137|
|     Garret Kinder|               63|
|    Markita Hansen|              115|
|      Reed Clapper|               87|
|Rosie Papadopoulos|               56|
|   Maureen Marcano|              119|
|  Violet Mclelland|              111|
|  Gladys Colclough|              149|
|         Boris Faz|               63|
|    Wilburn Farren|               44|
| Versie Hillebrand|              118|
| Marty Freudenburg|              120|
|    Cecily Lampkin|               86|
+------------------+-----------------+
only showing top 20 rows



### PROBLEM 6: Display in ascending order of revenue, 'Account' and 'Revenue' for telecom accounts 

In [84]:
# Filter df_clicks where "industry" is of type "Telecom"
# join "df_clicks" with "df_sales_pipeline" and "df_accounts" on "created_on" and "account" respectively
# Select "Account" and "Revenue" and group By on "Account" with agg as Sum on "Revenue"
# Sort the resulting Dataframe in ascending order of "Revenue"
# Show top 20 records

df_clicks.where(col("industry") == "Telecom").\
join(df_sales_pipeline, on=["created_on"], how="inner").\
join(df_accounts, on=['account'], how='inner').\
select("Account", "Revenue").\
groupBy("Account").\
agg(sum("Revenue").alias("Revenue")).\
sort(col("Revenue").asc()).show()
                                                                                           

+--------------------+------------------+
|             Account|           Revenue|
+--------------------+------------------+
|          Fasehatice|31161.600000000813|
|            Kan-code| 54538.29999999796|
|          Stanredtax| 70918.05000000104|
|           Konmatfix| 93330.00000000156|
|           Treequote|120103.30000000342|
|              Yearin|149309.75999999608|
|         Donquadtech|175523.04000000394|
|Olivia Pope & Ass...| 180111.6600000033|
|           Warephase|194623.79999999597|
|         Iselectrics|267140.01000000624|
|            Betatech| 288499.3199999981|
|     Sterling Cooper| 295050.2099999969|
|            Rangreen| 318580.0799999948|
|        Soylent Corp|335654.28000002034|
|              Hatfan| 363699.5799999936|
|           Ganjaflex|381847.99999998364|
|            Blackzim| 402422.4000000091|
|         Good Burger|436569.50999998517|
|            Xx-zobam| 463243.6799999794|
|               Hooli| 471548.7200000122|
+--------------------+------------

### PROBLEM 7: Display by revenue generated, bottom five 'Industries' and 'Revenue'

In [85]:
#solution code here
df_clicks.join(df_sales_pipeline, on=["created_on"], how="inner").\
join(df_accounts, on=["account"], how="inner").\
select("created_on", "industry", "account", "revenue").\
groupBy("industry").agg(sum("revenue").alias("Revenue")).\
sort(col("Revenue").asc()).show(5)

+--------------------+--------------------+
|            industry|             Revenue|
+--------------------+--------------------+
|           Education| 1.239683809399929E8|
|         Health Care| 1.613712810999844E8|
|Retail/Entertainment|1.6403756983998743E8|
|                  IT|1.9574398637998018E8|
|                SaaS|2.7779542166999525E8|
+--------------------+--------------------+
only showing top 5 rows



### PROBLEM 8: Display 'Month of Year' vs 'Sales', for GTXBasic. NOTE: "Month of Year" means month year (eg. Jan) and "Month" means month (eg. Jan 2020, Jan 2021 etc)

In [86]:
#solution code here
df_sales_pipeline.select("product", "close_value", "close_date").\
where((col("product") == "GTXBasic") & (col("close_date") != "NULL")).\
withColumn("close_date", df_sales_pipeline["close_date"].cast(DateType())).\
withColumn("Month of Year", month("close_date")).\
groupBy("Month of Year").agg(sum("close_value").alias("Sales")).\
sort(col("Month of Year").asc()).show()

+-------------+--------+
|Month of Year|   Sales|
+-------------+--------+
|            3| 28528.0|
|            4| 28541.0|
|            5| 56173.0|
|            6| 73480.0|
|            7| 85122.0|
|            8|118306.0|
|            9|163735.0|
|           10|130083.0|
|           11| 50605.0|
|           12|132490.0|
+-------------+--------+



### PROBLEM 9: Which sales agent(s) never lost a deal. Display as a dictionary {sales agent:sales}

In [87]:
#solution code here
df_sales_pipeline.select("sales_agent", "deal_stage", "close_value").\
where(col("deal_stage") != "Lost").\
groupBy("sales_agent").agg(sum("close_value").alias("sales")).show()

+------------------+---------+
|       sales_agent|    sales|
+------------------+---------+
|   Darcel Schlecht|2282609.0|
|     Kami Bicknell| 467043.0|
|    Vicki Laflamme| 617627.0|
|      Elease Gluck| 793666.0|
|Jonathan Berthelot| 456519.0|
|   Daniell Hammack| 575512.0|
|     Anna Snelling| 724285.0|
|      Cassey Cress| 879210.0|
|     Garret Kinder| 367530.0|
|    Markita Hansen| 480733.0|
|      Reed Clapper| 673723.0|
|Rosie Papadopoulos| 359671.0|
|   Maureen Marcano| 480797.0|
|  Violet Mclelland| 283498.0|
|  Gladys Colclough| 568135.0|
|         Boris Faz| 420838.0|
|    Wilburn Farren| 221465.0|
| Versie Hillebrand| 532287.0|
| Marty Freudenburg| 562287.0|
|    Cecily Lampkin| 477964.0|
+------------------+---------+
only showing top 20 rows



### PROBLEM 10: Display 'Sales Agents', 'Product', and 'Sales', for those sales agents who closed more than one deal on same day

In [88]:
# ASSUMPTION: Deal Close = (where deal_stage = WON)

df_sales_pipeline.groupBy("sales_agent", "product", "close_date", "deal_stage").\
agg(sum("close_value").alias("sales")).\
where(col("deal_stage")== "Won").\
groupBy("sales_agent", "product", "sales").\
agg(count("close_date").alias("count")).\
where(col("count") >= 2).\
drop("count").show()

+-----------------+------------+------+
|      sales_agent|     product| sales|
+-----------------+------------+------+
|    Corliss Cosme|    GTXBasic| 740.0|
|  Daniell Hammack|      MGRPFS|  52.0|
|    Hayden Neloms|      MGRPFU|  null|
|  Maureen Marcano|      MGRPFS|  73.0|
|Marty Freudenburg|GTXPlusBasic|1225.0|
|     Reed Clapper|    GTXBasic| 771.0|
|        Zane Levy|      MGRPFS|  90.0|
|Marty Freudenburg|GTXPlusBasic|1185.0|
|      Moses Frase|    GTXBasic| 467.0|
|Versie Hillebrand|      MGRPFS|  63.0|
|      Moses Frase|    GTXBasic| 797.0|
|  Rosalina Dieter|      MGRPFS|  58.0|
|  Rosalina Dieter|      MGRPFS|  78.0|
|   Markita Hansen|      MGRPFS|  45.0|
|   Vicki Laflamme|      MGRPFS|  65.0|
|   Vicki Laflamme|      MGRPFS|  54.0|
|   James Ascencio|      MGRPFS|  77.0|
|  Niesha Huffines|GTXPlusBasic| 852.0|
|   James Ascencio|      MGRPFS|  86.0|
|  Darcel Schlecht|      MGRPFS|  50.0|
+-----------------+------------+------+
only showing top 20 rows



Refer & Use Orchestra.json to answer problem 11-13 below

### PROBLEM 11: Display the instrument played by Lehmann Caroline

In [89]:
#Flatten the nested JSON to reach to the respective JOSN field

df_Orchestra_programs = df_Orchestra.withColumn("Orchestra_programs", explode_outer(col("programs"))).drop("programs")

df_Orchestra_programs_works = df_Orchestra_programs.\
withColumn("Orchestra_programs_works", explode_outer(col("Orchestra_programs.works"))).drop("Orchestra_programs")

df_Orchestra_programs_works_soloists = df_Orchestra_programs_works.\
withColumn("Orchestra_programs_works_soloists", explode_outer(col("Orchestra_programs_works.soloists"))).\
drop("Orchestra_programs_works")

df_Orchestra_programs_works_soloists = df_Orchestra_programs_works_soloists.\
withColumn("Orchestra_programs_works_soloists_soloist_name", col("Orchestra_programs_works_soloists.soloistName"))
df_Orchestra_programs_works_soloists = df_Orchestra_programs_works_soloists.\
withColumn("Orchestra_programs_works_soloists_soloist_Instrument", col("Orchestra_programs_works_soloists.soloistInstrument"))

df_Orchestra_programs_works_soloists = df_Orchestra_programs_works_soloists.drop("Orchestra_programs_works_soloists")

# Filter on soloist_name = Lehmann, Caroline
df_Orchestra_programs_works_soloists.where(col("Orchestra_programs_works_soloists_soloist_name") == "Lehmann, Caroline").show(1)


+----------------------------------------------+----------------------------------------------------+
|Orchestra_programs_works_soloists_soloist_name|Orchestra_programs_works_soloists_soloist_Instrument|
+----------------------------------------------+----------------------------------------------------+
|                             Lehmann, Caroline|                                             Soprano|
+----------------------------------------------+----------------------------------------------------+
only showing top 1 row



### PROBLEM 12: Display all vocalists

In [90]:
df_Orchestra_programs_works_soloists.\
where(col("Orchestra_programs_works_soloists_soloist_Instrument") == "Vocalist").\
distinct().show()

+----------------------------------------------+----------------------------------------------------+
|Orchestra_programs_works_soloists_soloist_name|Orchestra_programs_works_soloists_soloist_Instrument|
+----------------------------------------------+----------------------------------------------------+
|                                 Groebl, Marie|                                            Vocalist|
|                                  Lankow, Anna|                                            Vocalist|
|                           Cutter-Savage, Ruby|                                            Vocalist|
|                                  Doenhoff von|                                            Vocalist|
|                                          Ford|                                            Vocalist|
|                                Anderson, Sara|                                            Vocalist|
|                               Heinrich, Julia|                                  

### PROBLEM 13: Display orchestra played under program id 2561

In [91]:
# Flatten Dataframe
df_Orchestra_programs = df_Orchestra_programs.withColumn("Orchestra_programs_programID",  
                                                         col("Orchestra_programs.programID"))
df_Orchestra_programs = df_Orchestra_programs.withColumn("Orchestra_programs_orchestra", 
                                                         col("Orchestra_programs.orchestra"))

# Filter on Orchestra_programs_programID 2561 and show Orchestra_programs_orchestra
df_Orchestra_programs.drop("Orchestra_programs").where(col("Orchestra_programs_programID") == "2561").show()


+----------------------------+----------------------------+
|Orchestra_programs_programID|Orchestra_programs_orchestra|
+----------------------------+----------------------------+
|                        2561|        New York Philharm...|
+----------------------------+----------------------------+



### Refer & Use Orchestra.xml to answer problem 14-15 below

In [92]:
# Read XML file
# ASSUMPTION: used com.databricks.spark.xml jar package 

df_Orchestra_xml = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "programs") \
    .option("rowTag", "program") \
    .load("./data/Orchestra.xml")


### PROBLEM 14: Display locations used for event at time 8:15 PM

In [93]:
# Flatten nested dataframe

df_Orchestra_xml_flat = df_Orchestra_xml.withColumn("concertInfo_location", explode_outer(col("concertInfo.location")))
df_Orchestra_xml_flat = df_Orchestra_xml_flat.withColumn("concertInfo_time", explode_outer(col("concertInfo.time")))

#Filter on concertInfo_time = 8:15PM and select all distinct locations
df_Orchestra_xml_flat.where(col("concertInfo_time") == "8:15PM").select("concertInfo_location").distinct().show()


+--------------------+
|concertInfo_location|
+--------------------+
|        Hartford, CT|
|        Brooklyn, NY|
|     Springfield, MA|
|    Indianapolis, IN|
|      Providence, RI|
|          Newark, NJ|
|       Manhattan, NY|
|    Philadelphia, PA|
|       New Haven, CT|
+--------------------+



Follow these stpes to Resolve Error: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml
* Download spark-xml .jar file from https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11/0.6.0

In [94]:
import os
os.path

<module 'posixpath' from '/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/posixpath.py'>

* Copy `spark-xml` jar file to `/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/jars` 

The target location is prepared with the help of output from `os.path`

### PROBLEM 15: Display total number of programs

In [95]:
df_Orchestra_xml.count()

1033

********************************************* Test ends here **************************************************