In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,LongType,StringType,DateType,BooleanType,DoubleType,IntegerType
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("demo").getOrCreate()

In [3]:

from google.colab import drive
drive.mount('/content/drive')


data_path1 = "/content/drive/MyDrive/Colab Notebooks/intw_assignments/1/dataset/subscription_information.csv"
data_path2 = "/content/drive/MyDrive/Colab Notebooks/intw_assignments/1/dataset/payment_information.csv"
data_path3 = "/content/drive/MyDrive/Colab Notebooks/intw_assignments/1/dataset/industry_client_details.csv"
data_path4 = "/content/drive/MyDrive/Colab Notebooks/intw_assignments/1/dataset/finanical_information.csv"


Mounted at /content/drive


In [4]:
#providing the schema before hand so as to increase read speed if the data size is high

schema1 = StructType(
    [
        StructField('client_id',LongType(),nullable=True),
        StructField('subscription_type',StringType(),nullable=True),
        StructField('start_date',StringType(),nullable=True),
        StructField('end_date',StringType(),nullable=True),
        StructField('renewed',BooleanType(),nullable = True)
    ]
)

In [5]:
subs_info = spark.read.option('header','true').option('dateFormat','yyyy-MM-dd').schema(schema1).csv(data_path1)

In [None]:
subs_info.show()

+----------+-----------------+----------+----------+-------+
| client_id|subscription_type|start_date|  end_date|renewed|
+----------+-----------------+----------+----------+-------+
|1131383004|           Yearly|2020-11-11|2021-11-11|  false|
|4309371709|          Monthly|2021-05-24|2021-06-23|   true|
|3183675157|           Yearly|2021-12-25|2022-12-25|   true|
|5371694837|          Monthly|2020-03-14|2020-04-13|   true|
|5157113076|          Monthly|2019-11-07|2019-12-07|  false|
|7896208406|           Yearly|2022-02-24|2023-02-24|   true|
|4687291312|           Yearly|2019-06-14|2020-06-13|   true|
|7744303708|           Yearly|2022-10-11|2023-10-11|   true|
|6038028440|           Yearly|2018-12-03|2019-12-03|   true|
|6955217420|          Monthly|2019-07-07|2019-08-06|   true|
|7513745319|           Yearly|2020-09-17|2021-09-17|  false|
|6209923307|          Monthly|2021-07-05|2021-08-04|   true|
|2083869816|          Monthly|2019-05-16|2019-06-15|  false|
|7207890733|          Mo

In [6]:
#providing the schema before hand so as to increase read speed if the data size is high

schema2 = StructType(
    [
        StructField('client_id',LongType(),True),
        StructField('payment_date',DateType(),True),
        StructField('Amount_paid',DoubleType(),True),
        StructField('Payment_method',StringType(),True)
    ]
)

In [7]:
payment_info = spark.read.option('header','true').option('dateFormat','M/d/yyyy').schema(schema2).csv(data_path2)

In [None]:
payment_info.show()

+----------+------------+-----------+--------------+
| client_id|payment_date|Amount_paid|Payment_method|
+----------+------------+-----------+--------------+
|6292156167|  2019-09-16|      447.0| Bank Transfer|
|7462725203|  2018-05-21|      379.7| Bank Transfer|
|4698004907|  2021-09-11|      435.1|         Check|
|3510240337|  2020-12-07|      413.1|         Check|
|7501599785|  2019-03-04|       61.1| Bank Transfer|
|8719792472|  2018-02-10|       73.8|   Credit Card|
|3325348894|  2019-07-09|      348.1|   Credit Card|
|9031632460|  2019-07-06|      222.9|         Check|
|5319487809|  2019-03-02|       90.7| Bank Transfer|
|4280387012|  2022-11-25|       77.1|         Check|
|3891000577|  2020-08-16|      241.2| Bank Transfer|
|5396477027|  2021-02-13|      494.6|         Check|
|1089413096|  2019-04-21|      453.6|   Credit Card|
|5536015034|  2019-11-15|       65.4|   Credit Card|
|7513745319|  2020-09-17|      284.5|   Credit Card|
|3878940490|  2018-05-23|      489.4| Bank Tra

In [8]:
#providing the schema before hand so as to increase read speed if the data size is high

schema3 = StructType(
    [
        StructField('client_id',LongType(),True),
        StructField('company_size',StringType(),True),
        StructField('industry',StringType(),True),
        StructField('location',StringType(),True)
    ]
)

In [9]:
industry_client = spark.read.option('header','true').schema(schema3).csv(data_path3)

In [None]:
industry_client.show()

+----------+------------+---------------+---------+
| client_id|company_size|       industry| location|
+----------+------------+---------------+---------+
|4280387012|       Large|Finance Lending|   Mumbai|
|2095513148|       Small|Finance Lending|  Chennai|
|7225516707|      Medium|Finance Lending|New Delhi|
|8093537819|       Large|    Block Chain|   Mumbai|
|4387541014|      Medium|    Hyper Local| Banglore|
|5698091148|       Large|    Block Chain| Banglore|
|8884551090|       Large|             AI|Hyderabad|
|9598980006|      Medium|             AI|New Delhi|
|7207890733|       Large|    Block Chain|Hyderabad|
|5059906722|       Small|             AI| Banglore|
|9166568761|       Small|Finance Lending|Hyderabad|
|1131383004|       Large|Finance Lending| Banglore|
|9699223737|       Small|    Block Chain|   Mumbai|
|3325348894|       Small|    Hyper Local| Banglore|
|9714663815|       Small|             AI|New Delhi|
|6296057401|       Small|Finance Lending|  Chennai|
|5387264885|

In [10]:
#providing the schema before hand so as to increase read speed if the data size is high

schema4 = StructType(
    [
        StructField('id',IntegerType(),True),
        StructField('start_date',DateType(),True),
        StructField('end_date',DateType(),True),
        StructField('inflation_rate',DoubleType(),True),
        StructField('gdp_growth_rate',DoubleType(),True)
    ]
)

In [11]:
financial_info = spark.read.option('header','true').option('dateFormat','yyyy-MM-dd').schema(schema4).csv(data_path4)
financial_info.show()

+---+----------+----------+--------------+---------------+
| id|start_date|  end_date|inflation_rate|gdp_growth_rate|
+---+----------+----------+--------------+---------------+
|  0|2018-01-01|2018-03-31|          5.77|           3.51|
|  1|2018-04-01|2018-06-30|          1.17|           2.15|
|  2|2018-07-01|2018-09-30|          1.56|           1.82|
|  3|2018-10-01|2018-12-31|          2.78|           2.43|
|  4|2019-01-01|2019-03-31|          6.91|           3.44|
|  5|2019-04-01|2019-06-30|          3.84|           3.48|
|  6|2019-07-01|2019-09-30|          7.71|           1.35|
|  7|2019-10-01|2019-12-31|          2.71|           1.79|
|  8|2020-01-01|2020-03-31|           4.4|           1.36|
|  9|2020-04-01|2020-06-30|          4.69|           1.23|
| 10|2020-07-01|2020-09-30|          1.43|            2.4|
| 11|2020-10-01|2020-12-31|           3.1|            2.7|
| 12|2021-01-01|2021-03-31|          2.57|           1.18|
| 13|2021-04-01|2021-06-30|          0.76|           3.6

observation

date are in different format

in data modellling stage can be solved using a date_dim for standardization

In [None]:
#q1
#How many finance lending and blockchain clients does the organization have?


In [None]:
industry_client.show()

+----------+------------+---------------+---------+
| client_id|company_size|       industry| location|
+----------+------------+---------------+---------+
|4280387012|       Large|Finance Lending|   Mumbai|
|2095513148|       Small|Finance Lending|  Chennai|
|7225516707|      Medium|Finance Lending|New Delhi|
|8093537819|       Large|    Block Chain|   Mumbai|
|4387541014|      Medium|    Hyper Local| Banglore|
|5698091148|       Large|    Block Chain| Banglore|
|8884551090|       Large|             AI|Hyderabad|
|9598980006|      Medium|             AI|New Delhi|
|7207890733|       Large|    Block Chain|Hyderabad|
|5059906722|       Small|             AI| Banglore|
|9166568761|       Small|Finance Lending|Hyderabad|
|1131383004|       Large|Finance Lending| Banglore|
|9699223737|       Small|    Block Chain|   Mumbai|
|3325348894|       Small|    Hyper Local| Banglore|
|9714663815|       Small|             AI|New Delhi|
|6296057401|       Small|Finance Lending|  Chennai|
|5387264885|

In [None]:
#by defining a list in this manner we can make our filtratin dynamic
industries = ['Finance Lending','Block Chain']

#search in the list
#and if it is there then agg and count the distinct id's
q1 = industry_client.filter(F.col('industry').isin(industries)) \
      .groupBy(F.col('industry')).agg(
          F.countDistinct('client_id').alias('client_count')
      )
q1.show()

+---------------+------------+
|       industry|client_count|
+---------------+------------+
|Finance Lending|          22|
|    Block Chain|          25|
+---------------+------------+



In [None]:
#q2
#Which industry in the organization has the highest renewal rate?

In [None]:
subs_info.show()

+----------+-----------------+----------+----------+-------+
| client_id|subscription_type|start_date|  end_date|renewed|
+----------+-----------------+----------+----------+-------+
|1131383004|           Yearly|2020-11-11|2021-11-11|  false|
|4309371709|          Monthly|2021-05-24|2021-06-23|   true|
|3183675157|           Yearly|2021-12-25|2022-12-25|   true|
|5371694837|          Monthly|2020-03-14|2020-04-13|   true|
|5157113076|          Monthly|2019-11-07|2019-12-07|  false|
|7896208406|           Yearly|2022-02-24|2023-02-24|   true|
|4687291312|           Yearly|2019-06-14|2020-06-13|   true|
|7744303708|           Yearly|2022-10-11|2023-10-11|   true|
|6038028440|           Yearly|2018-12-03|2019-12-03|   true|
|6955217420|          Monthly|2019-07-07|2019-08-06|   true|
|7513745319|           Yearly|2020-09-17|2021-09-17|  false|
|6209923307|          Monthly|2021-07-05|2021-08-04|   true|
|2083869816|          Monthly|2019-05-16|2019-06-15|  false|
|7207890733|          Mo

In [12]:
industry_client.show()

+----------+------------+---------------+---------+
| client_id|company_size|       industry| location|
+----------+------------+---------------+---------+
|4280387012|       Large|Finance Lending|   Mumbai|
|2095513148|       Small|Finance Lending|  Chennai|
|7225516707|      Medium|Finance Lending|New Delhi|
|8093537819|       Large|    Block Chain|   Mumbai|
|4387541014|      Medium|    Hyper Local| Banglore|
|5698091148|       Large|    Block Chain| Banglore|
|8884551090|       Large|             AI|Hyderabad|
|9598980006|      Medium|             AI|New Delhi|
|7207890733|       Large|    Block Chain|Hyderabad|
|5059906722|       Small|             AI| Banglore|
|9166568761|       Small|Finance Lending|Hyderabad|
|1131383004|       Large|Finance Lending| Banglore|
|9699223737|       Small|    Block Chain|   Mumbai|
|3325348894|       Small|    Hyper Local| Banglore|
|9714663815|       Small|             AI|New Delhi|
|6296057401|       Small|Finance Lending|  Chennai|
|5387264885|

In [24]:
#attach the industry type to the subscription
merged_df = subs_info.alias('t1').join(industry_client.alias('t2'),on = F.col('t1.client_id') == F.col('t2.client_id'),how='inner')
merged_df.show()

+----------+-----------------+----------+----------+-------+----------+------------+---------------+---------+
| client_id|subscription_type|start_date|  end_date|renewed| client_id|company_size|       industry| location|
+----------+-----------------+----------+----------+-------+----------+------------+---------------+---------+
|4280387012|           Yearly|2022-11-25|2023-11-25|   true|4280387012|       Large|Finance Lending|   Mumbai|
|2095513148|          Monthly|2021-11-03|2021-12-03|  false|2095513148|       Small|Finance Lending|  Chennai|
|7225516707|           Yearly|2021-01-19|2022-01-19|   true|7225516707|      Medium|Finance Lending|New Delhi|
|8093537819|          Monthly|2019-09-14|2019-10-14|  false|8093537819|       Large|    Block Chain|   Mumbai|
|4387541014|          Monthly|2018-11-08|2018-12-08|  false|4387541014|      Medium|    Hyper Local| Banglore|
|5698091148|          Monthly|2020-08-04|2020-09-03|   true|5698091148|       Large|    Block Chain| Banglore|
|

In [25]:
#group by industry
#if in a grp if renewed is true set as 1 else 0
#then do sum(of all 1's)/total occurence in a grp
#this gives the renewal rate
#then order it in descending order
filter_df2 = merged_df.groupBy(F.col('industry')).agg(
    (F.sum(F.when(F.col('renewed') == True,1).otherwise(0))/F.count('*')).alias('renewal_rate') \
).orderBy(F.desc(F.col('renewal_rate')))
filter_df2.first()

Row(industry='Gaming', renewal_rate=0.7272727272727273)

In [None]:
#q3
#What was the average inflation rate when their subscriptions were renewed?

In [None]:
# i am assuming the question is asking what was the avg inflation rate when  a client renewed their subscription

In [21]:
#ascertaing that the start_date and end_date follows the mentioned date format
subs_info = subs_info.withColumn('end_date',F.to_date('end_date','yyyy-MM-dd'))
financial_info = financial_info.withColumn('start_date',F.to_date('start_date','yyyy-MM-dd')) \
                               .withColumn('end_date',F.to_date('end_date','yyyy-MM-dd'))

In [23]:
#filter renewals
renewed_subs = subs_info.filter(F.col('renewed') == True)

In [28]:
#join the renewals such that end data of the renwal falls in between start and end dates of row in finalcial info
joined_df = renewed_subs.alias('t1')\
.join(financial_info.alias('t2'),
      on = (F.col('t1.end_date')>= F.col('t2.start_date')) & (F.col('t1.end_date')<= F.col('t2.end_date')),
      how='inner')

In [29]:
joined_df.show()

+----------+-----------------+----------+----------+-------+---+----------+----------+--------------+---------------+
| client_id|subscription_type|start_date|  end_date|renewed| id|start_date|  end_date|inflation_rate|gdp_growth_rate|
+----------+-----------------+----------+----------+-------+---+----------+----------+--------------+---------------+
|4309371709|          Monthly|2021-05-24|2021-06-23|   true| 13|2021-04-01|2021-06-30|          0.76|           3.63|
|3183675157|           Yearly|2021-12-25|2022-12-25|   true| 19|2022-10-01|2022-12-31|           4.4|           1.05|
|5371694837|          Monthly|2020-03-14|2020-04-13|   true|  9|2020-04-01|2020-06-30|          4.69|           1.23|
|4687291312|           Yearly|2019-06-14|2020-06-13|   true|  9|2020-04-01|2020-06-30|          4.69|           1.23|
|6038028440|           Yearly|2018-12-03|2019-12-03|   true|  7|2019-10-01|2019-12-31|          2.71|           1.79|
|6955217420|          Monthly|2019-07-07|2019-08-06|   t

In [31]:
#calc the avg. inflation
joined_df.agg(
    F.avg('inflation_rate').alias('avg_inflation_rate')
).collect()[0]['avg_inflation_rate']

4.311800000000001

In [None]:
#q4
#What is the median amount paid each year for all payment methods?

In [32]:
payment_info.show()

+----------+------------+-----------+--------------+
| client_id|payment_date|Amount_paid|Payment_method|
+----------+------------+-----------+--------------+
|6292156167|  2019-09-16|      447.0| Bank Transfer|
|7462725203|  2018-05-21|      379.7| Bank Transfer|
|4698004907|  2021-09-11|      435.1|         Check|
|3510240337|  2020-12-07|      413.1|         Check|
|7501599785|  2019-03-04|       61.1| Bank Transfer|
|8719792472|  2018-02-10|       73.8|   Credit Card|
|3325348894|  2019-07-09|      348.1|   Credit Card|
|9031632460|  2019-07-06|      222.9|         Check|
|5319487809|  2019-03-02|       90.7| Bank Transfer|
|4280387012|  2022-11-25|       77.1|         Check|
|3891000577|  2020-08-16|      241.2| Bank Transfer|
|5396477027|  2021-02-13|      494.6|         Check|
|1089413096|  2019-04-21|      453.6|   Credit Card|
|5536015034|  2019-11-15|       65.4|   Credit Card|
|7513745319|  2020-09-17|      284.5|   Credit Card|
|3878940490|  2018-05-23|      489.4| Bank Tra

In [36]:
#group by payment category and year
#calculat the median by the grouped category
result = payment_info.groupBy(F.col('Payment_method'),F.year(F.col('payment_date'))).agg(
    F.median(F.col('Amount_paid')).alias("median_amount")
).orderBy(F.col('Payment_method'),F.year(F.col('payment_date')))

result.show()

+--------------+------------------+-------------+
|Payment_method|year(payment_date)|median_amount|
+--------------+------------------+-------------+
| Bank Transfer|              2018|       281.65|
| Bank Transfer|              2019|        184.2|
| Bank Transfer|              2020|        225.1|
| Bank Transfer|              2021|        255.3|
| Bank Transfer|              2022|        196.5|
|         Check|              2018|        216.6|
|         Check|              2019|        410.2|
|         Check|              2020|        413.1|
|         Check|              2021|        435.1|
|         Check|              2022|        275.5|
|   Credit Card|              2018|       229.15|
|   Credit Card|              2019|        401.9|
|   Credit Card|              2020|       285.25|
|   Credit Card|              2021|        208.7|
|   Credit Card|              2022|        326.2|
+--------------+------------------+-------------+



In [37]:
spark.stop()