In [65]:
# import packages
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
import json

In [66]:
# intialize spark session
spark = SparkSession.builder.appName(
    "Spark job for flattening JSON data"
).getOrCreate()

In [67]:
# load leads data
leads_df = (
        spark.read.option("inferSchema", "true").option("header", "true")
        .format("csv")
        .load("leads_export.csv")
    ) 

                                                                                

In [68]:
# show top 5 rows
leads_df.show(5)

+--------------------+----------+---------+--------------------+--------------------+-------------------+
|             company|first_name|last_name|               title|             country| operator_timestamp|
+--------------------+----------+---------+--------------------+--------------------+-------------------+
|       Hernandez Ltd|   Spencer|   Conner|Local Government ...|             Denmark|2021-08-31 00:00:00|
|          Nelson PLC|     Aaron|    Grant|Technical Sales E...|     Solomon Islands|2022-10-26 00:00:00|
|Marks, Swanson an...|      Dana|    Hayes|Environmental Hea...|Falkland Islands ...|2021-08-14 00:00:00|
|Simpson, Singleto...|    Teresa|  Pearson|Communications En...|        Saudi Arabia|2021-05-02 00:00:00|
|Shepherd, Collins...|      Greg|    Green|        Town Planner|           Venezuela|2021-08-31 00:00:00|
+--------------------+----------+---------+--------------------+--------------------+-------------------+
only showing top 5 rows



In [69]:
['company', 'first_name', 'last_name', "title", 'country', "operator_timestamp"]

['company',
 'first_name',
 'last_name',
 'title',
 'country',
 'operator_timestamp']

In [70]:
# load companies data
companies_df = (
        spark.read.option("inferSchema", "true").option("header", "true")
        .format("csv")
        .load("company_export.csv")
    ) 

In [71]:
# show top 6 rows
companies_df.show(6)

+--------------------+-------------+
|             company|  hq_location|
+--------------------+-------------+
|       Hernandez Ltd| South Africa|
|          Nelson PLC|      Vietnam|
|Marks, Swanson an...|       Serbia|
|Simpson, Singleto...|      Uruguay|
|Shepherd, Collins...|Guinea-Bissau|
|       Estrada-Lewis|   San Marino|
+--------------------+-------------+
only showing top 6 rows



In [72]:
# Ordering the leads by timestamp
leads_df = leads_df.sort("operator_timestamp")

In [73]:
# count leads before removing duplicates
leads_df.count()

1000000

In [74]:
# drop duplicates based on the constraint fields
leads_df = leads_df.dropDuplicates(['company', 'first_name', 'last_name', 'country'])

In [75]:
# count leads after removing duplicates
leads_df.count()

                                                                                

999999

In [76]:
# count companies before removing duplicates
companies_df.count()

1000001

In [77]:
companies_df = companies_df.dropDuplicates(['company', 'hq_location'])

In [57]:
# count companies after removing duplicates
companies_df.count()

                                                                                

977654

In [59]:
# create tempView for leads
leads_df.createOrReplaceGlobalTempView("leads")

In [60]:
# create tempView for companies
companies_df.createOrReplaceGlobalTempView("companies")

## Questions

In [61]:
# Get total number leads gotten each month in 2022

total_leads_gotten_by_year =spark.sql("""

SELECT MONTH(operator_timestamp) MONTH, COUNT(*) TOTAL_LEADS_GOTTEN
FROM global_temp.leads
WHERE YEAR(operator_timestamp)=YEAR(NOW())
GROUP BY MONTH(operator_timestamp)
ORDER BY 1


""")
total_leads_gotten_by_year.show()




+-----+------------------+
|MONTH|TOTAL_LEADS_GOTTEN|
+-----+------------------+
|    1|             44444|
|    2|             40001|
|    3|             44371|
|    4|             42934|
|    5|             44801|
|    6|             42803|
|    7|             44620|
|    8|             44588|
|    9|             42831|
|   10|             43934|
|   11|             41147|
+-----+------------------+



                                                                                

In [62]:
# Get the number leads captured in the past 10 days
last_10_days = spark.sql("""

SELECT operator_timestamp DATE, COUNT(*) TOTAL_LEADS_GOTTEN
FROM global_temp.leads
WHERE operator_timestamp >= NOW() - INTERVAL '10' days 
AND operator_timestamp < NOW()
GROUP BY 1
ORDER BY 1


""")
last_10_days.show()



+-------------------+------------------+
|               DATE|TOTAL_LEADS_GOTTEN|
+-------------------+------------------+
|2022-11-14 00:00:00|              1445|
|2022-11-15 00:00:00|              1439|
|2022-11-16 00:00:00|              1489|
|2022-11-17 00:00:00|              1454|
|2022-11-18 00:00:00|              1446|
|2022-11-19 00:00:00|              1455|
|2022-11-20 00:00:00|              1418|
|2022-11-21 00:00:00|              1426|
|2022-11-22 00:00:00|              1388|
|2022-11-23 00:00:00|              1465|
+-------------------+------------------+



                                                                                

In [63]:
# Top 20 countries with the highest leads so far
leads_per_country = spark.sql("""

SELECT country COUNTRY, COUNT(*) COUNT
FROM global_temp.leads
GROUP BY 1
ORDER BY 2 DESC
LIMIT 20


""")
leads_per_country.show()



+--------------------+-----+
|             COUNTRY|COUNT|
+--------------------+-----+
|               Congo| 8442|
|               Korea| 8208|
|               Yemen| 4243|
|            Pakistan| 4222|
|Cocos (Keeling) I...| 4205|
|   Brunei Darussalam| 4203|
|      Norfolk Island| 4199|
|               Nepal| 4199|
|               Macao| 4198|
|             Denmark| 4198|
|             Armenia| 4193|
|             Reunion| 4184|
|          Madagascar| 4183|
|               Aruba| 4182|
|             Burundi| 4178|
|          Azerbaijan| 4177|
|             Uruguay| 4176|
|           Mauritius| 4175|
|            Bulgaria| 4171|
|              Kuwait| 4169|
+--------------------+-----+



                                                                                

In [64]:
# How many staffs are in the HQ of company? 

hq_staffs = spark.sql("""

SELECT c.company COMPANY, COUNT(*) COUNT
FROM global_temp.leads l
LEFT JOIN global_temp.companies c ON c.hq_location = l.country AND c.company = l.company

WHERE c.company IS NOT NULL

GROUP BY 1
ORDER BY 2 DESC


""")
hq_staffs.show(10)



+----------------+-----+
|         COMPANY|COUNT|
+----------------+-----+
|       Smith PLC| 1138|
|       Smith Ltd| 1134|
|       Smith LLC| 1097|
|       Smith Inc| 1088|
|     Smith Group| 1063|
|  Smith and Sons| 1023|
|Johnson and Sons|  862|
|     Johnson Ltd|  823|
|     Johnson LLC|  815|
|     Johnson PLC|  787|
+----------------+-----+
only showing top 10 rows



                                                                                

In [15]:
def save_as_parquet(dataframe, file_path, mode="append"):
    """Save a dataframe as a parquet file."""
    dataframe.write.mode(mode).parquet(file_path)


In [16]:
save_as_parquet(total_leads_gotten_by_year, "output/total_leads_gotten_by_year.parquet")

                                                                                

In [17]:
save_as_parquet(last_10_days, "output/last_10_days.parquet")

In [18]:
save_as_parquet(leads_per_country, "output/leads_per_country.parquet")

                                                                                

In [19]:
save_as_parquet(hq_staffs, "output/hq_staffs.parquet")

                                                                                