## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
File uploaded to /FileStore/tables/campaign.csv
File uploaded to /FileStore/tables/campaign_response.csv
File uploaded to /FileStore/tables/campaign_results.csv
File uploaded to /FileStore/tables/customer.cs

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Define schema for customer
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("has_credit_card", StringType(), True),
    StructField("has_checking_account", StringType(), True),
    StructField("has_debit_account", StringType(), True),
    StructField("state", StringType(), True),
    StructField("city", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("education_level", StringType(), True),
    StructField("visits_last_3_months", IntegerType(), True),
    StructField("visits_last_12_months", IntegerType(), True)
])

customer_path = '/FileStore/tables/customer.csv'
customer_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(customer_schema).load(customer_path)

customer_df.printSchema()

customer_df.show(5)

root
 |-- customer_id: integer (nullable = true)
 |-- has_credit_card: string (nullable = true)
 |-- has_checking_account: string (nullable = true)
 |-- has_debit_account: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- education_level: string (nullable = true)
 |-- visits_last_3_months: integer (nullable = true)
 |-- visits_last_12_months: integer (nullable = true)

+-----------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+
|customer_id|has_credit_card|has_checking_account|has_debit_account|   state|         city|gender|age|education_level|visits_last_3_months|visits_last_12_months|
+-----------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+
|          1|  

In [0]:
# Define schema for campaign result
campaign_results_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("campaign_id", IntegerType(), True),
    StructField("num_products_before_campaign", IntegerType(), True),
    StructField("num_products_after_campaign", IntegerType(), True),
    StructField("new_credit_application_received", IntegerType(), True)
])

campaign_results_path = '/FileStore/tables/campaign_results.csv'
campaign_results_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false')\
    .schema(campaign_results_schema)\
    .load(campaign_results_path)

campaign_results_df.printSchema()

campaign_results_df.show(5)

root
 |-- customer_id: integer (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- num_products_before_campaign: integer (nullable = true)
 |-- num_products_after_campaign: integer (nullable = true)
 |-- new_credit_application_received: integer (nullable = true)

+-----------+-----------+----------------------------+---------------------------+-------------------------------+
|customer_id|campaign_id|num_products_before_campaign|num_products_after_campaign|new_credit_application_received|
+-----------+-----------+----------------------------+---------------------------+-------------------------------+
|        683|          2|                           3|                         83|                              1|
|        942|          1|                           4|                         57|                              0|
|        462|          2|                           3|                         25|                              0|
|        942|          3|      

In [0]:
# Define schema for campaign response
campaign_response_schema = StructType([
    StructField("campaign_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("response_received_in_24_hrs", IntegerType(), True),
    StructField("response_received_after_24_hrs", IntegerType(), True),
])

campaign_response_path = '/FileStore/tables/campaign_response.csv'
campaign_response_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false')\
    .schema(campaign_response_schema)\
    .load(campaign_response_path)

campaign_response_df.printSchema()

campaign_response_df.show(5)

root
 |-- campaign_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- response_received_in_24_hrs: integer (nullable = true)
 |-- response_received_after_24_hrs: integer (nullable = true)

+-----------+-----------+---------------------------+------------------------------+
|campaign_id|customer_id|response_received_in_24_hrs|response_received_after_24_hrs|
+-----------+-----------+---------------------------+------------------------------+
|          2|        483|                          0|                             0|
|          4|        434|                          0|                             0|
|          4|        472|                          1|                             1|
|          4|        385|                          0|                             1|
|          2|        400|                          1|                             0|
+-----------+-----------+---------------------------+------------------------------+
only showing top 5

In [0]:
# Define schema for campaign info
campaign_schema = StructType([
    StructField("campaign_id", IntegerType(), True),
    StructField("details", StringType(), True)
])

campaign_path = '/FileStore/tables/campaign.csv'
campaign_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(campaign_schema).load(campaign_path)

campaign_df.printSchema()

campaign_df.show(truncate=0)


root
 |-- campaign_id: integer (nullable = true)
 |-- details: string (nullable = true)

+-----------+----------------------------------------------------+
|campaign_id|details                                             |
+-----------+----------------------------------------------------+
|1          |Call us today at 1-800-9090090 (get $100 gift card )|
|2          |Send us a text at 24567 ( 10000 points)             |
|3          |Talk to us to win a $100 gift card                  |
|4          |Send us an email                                    |
+-----------+----------------------------------------------------+



In [0]:
# Defining a new DataFrame containing customers who responded to any campaign
responded_customers_df = campaign_response_df.filter(
    (campaign_response_df["response_received_in_24_hrs"] == 1) |
    (campaign_response_df["response_received_after_24_hrs"] == 1)
)

# Joining with the customer_df to get customer details
top_customers_df = responded_customers_df.join(customer_df, on="customer_id", how="inner")


# Selecting the desired columns
output1_df = top_customers_df.select(
    "customer_id",
    "age",
    "city",
    "has_credit_card",
    "gender"
)

output1_df.show()

+-----------+---+--------------------+---------------+------+
|customer_id|age|                city|has_credit_card|gender|
+-----------+---+--------------------+---------------+------+
|          1| 21|            Marietta|          false|Female|
|          2| 28|       New York City|          false|  Male|
|          5| 30|            Hamilton|           true|Female|
|          5| 30|            Hamilton|           true|Female|
|          5| 30|            Hamilton|           true|Female|
|          5| 30|            Hamilton|           true|Female|
|          6| 25|                Reno|          false|Female|
|          8| 27|Hot Springs Natio...|           true|  Male|
|         10| 21|            Sterling|          false|  Male|
|         10| 21|            Sterling|          false|  Male|
|         10| 21|            Sterling|          false|  Male|
|         12| 24|            Santa Fe|           true|Female|
|         13| 28|           Las Vegas|          false|  Male|
|       

In [0]:
# Filtering for female customers in New York City
filtered_top_customers_df = top_customers_df.filter(
    (top_customers_df["gender"] == "Female") &
    (top_customers_df["city"] == "New York City")
)

filtered_top_customers_df.show()


+-----------+-----------+---------------------------+------------------------------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+
|customer_id|campaign_id|response_received_in_24_hrs|response_received_after_24_hrs|has_credit_card|has_checking_account|has_debit_account|   state|         city|gender|age|education_level|visits_last_3_months|visits_last_12_months|
+-----------+-----------+---------------------------+------------------------------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+
|         53|          1|                          0|                             1|          false|                true|            false|New York|New York City|Female| 23|    high_school|                   2|                   90|
|         53|          2|                          0|               

In [0]:

# Selecting top 50 customers based on their response count
top_50_female_nyc_customers = filtered_top_customers_df \
    .groupBy("customer_id","campaign_id") \
    .agg({"response_received_in_24_hrs": "sum", "response_received_after_24_hrs": "sum"}) \
    .withColumnRenamed("sum(response_received_in_24_hrs)", "response_in_24_hrs") \
    .withColumnRenamed("sum(response_received_after_24_hrs)", "response_after_24_hrs") \
    .orderBy((col("response_in_24_hrs") + col("response_after_24_hrs")).desc()) \
    .limit(50)

top_50_female_nyc_customers.show()


+-----------+-----------+------------------+---------------------+
|customer_id|campaign_id|response_in_24_hrs|response_after_24_hrs|
+-----------+-----------+------------------+---------------------+
|        399|          4|                 1|                    1|
|        803|          2|                 1|                    1|
|        891|          4|                 1|                    1|
|        134|          1|                 0|                    1|
|         53|          1|                 0|                    1|
|        134|          3|                 1|                    0|
|         53|          2|                 0|                    1|
|        891|          3|                 1|                    0|
|        134|          4|                 1|                    0|
+-----------+-----------+------------------+---------------------+



In [0]:
customers_df = customer_df.join(campaign_response_df, on="customer_id", how="inner")
customers_name_df = customers_df.join(campaign_df, on="campaign_id", how="inner")
output2_df = customers_name_df.join(top_50_female_nyc_customers, on="campaign_id", how="left")

output2_df.show()

+-----------+-----------+---------------+--------------------+-----------------+--------+--------------------+------+---+---------------+--------------------+---------------------+---------------------------+------------------------------+--------------------+-----------+------------------+---------------------+
|campaign_id|customer_id|has_credit_card|has_checking_account|has_debit_account|   state|                city|gender|age|education_level|visits_last_3_months|visits_last_12_months|response_received_in_24_hrs|response_received_after_24_hrs|             details|customer_id|response_in_24_hrs|response_after_24_hrs|
+-----------+-----------+---------------+--------------------+-----------------+--------+--------------------+------+---+---------------+--------------------+---------------------+---------------------------+------------------------------+--------------------+-----------+------------------+---------------------+
|          4|          1|          false|               fa

In [0]:
# Joining
responded_education_df = responded_customers_df.join(customer_df, on="customer_id", how="inner")

# Group by education level and count the number of customers per level
customers_per_education_level = responded_education_df \
    .groupBy("education_level","campaign_id") \
    .count() \
    .orderBy("count", ascending=False)

output3_df = customers_name_df.join(customers_per_education_level, on="campaign_id", how="left")

output3_df.show()

+-----------+-----------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+---------------------------+------------------------------+--------------------+---------------+-----+
|campaign_id|customer_id|has_credit_card|has_checking_account|has_debit_account|   state|         city|gender|age|education_level|visits_last_3_months|visits_last_12_months|response_received_in_24_hrs|response_received_after_24_hrs|             details|education_level|count|
+-----------+-----------+---------------+--------------------+-----------------+--------+-------------+------+---+---------------+--------------------+---------------------+---------------------------+------------------------------+--------------------+---------------+-----+
|          4|          1|          false|               false|            false| Georgia|     Marietta|Female| 21|    high_school|                   9|                   61