# <span style='color:green'> Consumer Financial Protection Bureau Dataset </span> 
Analysis on Complaint Data from the Consumer Financial Protection Bureau between 2012-2017 will be performed as follows:
1. **Initializing Spark Environment**
2. **Importing the dataset from the URL into a DataFrame**
3. Data set **metadata analysis** and **Data Preparation**:
  1. Display **schema and size** of the DataFrame
  2. **Dropping Columns**
  3. Renaming Columns
  4. Get one or multiple **random samples** from the data set
  5. Identify **data entities**, **metrics** and **dimensions**
  6. **Columns categorization**
4. Columns groups **basic profiling** to better understand our data set:
  1. **Timing related** columns basic profiling
  2. **Complaints related** columns basic profiling
  3. **Resolution related** columns basic profiling
  4. **Location related** columns basic profiling
5. **Business Questions**
  1. Overall **on-time complaint resolution rate**
  2. **Time Analysis**
    1. Number of complaints by Year
    2. Company with most complaints received by Year
    3. Submission platform usage frequency by Year
  3. **Issue Analysis**
    1. Top 20 - Most frequenty reported Issues
    2. Top 5 issues reported by Product
    3. Top 20 - Most frequenty reported Issues not resolved on time
    4. Top 20 - Issues disputed by customer
  4. Predicting **Number of complaints pending** for 2017

  
## 1. Initializing Spark Environment

In [1]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## 2. Importing the dataset from the URL into a DataFrame

In [2]:
import pandas as pd
from pyspark.sql.types import StructType,StructField, IntegerType, DoubleType, StringType, DateType, BooleanType
df_bank_complaints = pd.read_csv('https://query.data.world/s/ukhnhebhh2hvofs3r2gebe2cvn7zne').astype("str")
df_credit_card_complaints = pd.read_csv('https://query.data.world/s/n55tlkemw7qplik7cbg5kkjmqu55ds').astype("str")
df_pd = pd.concat([df_bank_complaints,df_credit_card_complaints],axis=0)
df = spark.createDataFrame(df_pd)

## 3. Data set metadata analysis and Data Preparation
### A. Display schema and size of the DataFrame

In [3]:
from IPython.display import display, Markdown
df.printSchema()
display(Markdown("This DataFrame has **%d rows**." % df.count()))

root
 |-- Date received: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Sub-product: string (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Sub-issue: string (nullable = true)
 |-- Consumer complaint narrative: string (nullable = true)
 |-- Company public response: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIP code: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumer consent provided?: string (nullable = true)
 |-- Submitted via: string (nullable = true)
 |-- Date sent to company: string (nullable = true)
 |-- Company response to consumer: string (nullable = true)
 |-- Timely response?: string (nullable = true)
 |-- Consumer disputed?: string (nullable = true)
 |-- Complaint ID: string (nullable = true)



This DataFrame has **172529 rows**.

### B. Dropping Columns 
* **Complaint ID**: Unique ID given to every complaint for register purposes
* **Date sent to company**: There is present another column recording whether there was a Timely response to the complaint and since resolution date has not been provided in the dataset this date is not of any significance
* **Consumer complaint narrative**: This is a free text attribute with narration of customers complaint and will not be of use for gaining insights

In [4]:
df = df.drop("Complaint ID","Date sent to company","Consumer complaint narrative")

### C. Renaming the columns to follow camel case

In [5]:
df = df.withColumnRenamed("Date received","dateRecvd")
df = df.withColumnRenamed("Sub-product","subProduct")
df = df.withColumnRenamed("Sub-issue","subIssue")
df = df.withColumnRenamed("Company public response","companyPublicResponse")
df = df.withColumnRenamed("ZIP code","zipCode")
df = df.withColumnRenamed("Consumer consent provided?","consumerConsent")
df = df.withColumnRenamed("Submitted via","submittedVia")
df = df.withColumnRenamed("Company response to consumer","companyResponse")
df = df.withColumnRenamed("Timely response?","timelyResponse")
df = df.withColumnRenamed("Consumer disputed?","consumerDisputed")

### D. Get one or multiple random samples from the data set

In [6]:
df.cache() # optimization to make the processing faster
df.sample(False, 0.1).take(2)

[Row(dateRecvd='07/30/2013', Product='Bank account or service', subProduct='Checking account', Issue='Using a debit or ATM card', subIssue='nan', companyPublicResponse='nan', Company='JPMorgan Chase & Co.', State='NY', zipCode='11772', Tags='nan', consumerConsent='nan', submittedVia='Web', companyResponse='Closed with explanation', timelyResponse='Yes', consumerDisputed='No'),
 Row(dateRecvd='07/30/2013', Product='Bank account or service', subProduct='Savings account', Issue='Deposits and withdrawals', subIssue='nan', companyPublicResponse='nan', Company='JPMorgan Chase & Co.', State='NY', zipCode='10469', Tags='nan', consumerConsent='nan', submittedVia='Web', companyResponse='Closed with explanation', timelyResponse='Yes', consumerDisputed='No')]

### E. Data entities, metrics and dimensions

I've identified the following elements:

* **Entities:** Product, Sub-Product, Issue, Sub-Issue, Company, Tags
* **Metrics:** Company Public Response, Company Consumer Response, Timely response?, Consumer disputed?,
* **Dimensions:** State, ZIP Code, Company, Submitted via

### F. Column categorization

The following could be a potential column categorization:

* **Timing related columns:** *Date received*, *Timely response?*
* **Complaint related columns:** *Product*, *Sub-Product*, *Issue*, *Sub-Issue*, *Company*, *Tags*, *Submitted Via*
* **Resoultion related columns:** *Consumer Disputed?*, *Company Response*, *Company Public Response*
* **Location related columns:** *Consumer Consent*, *State*, *ZIP Code*

## 4. Columns groups basic profiling to better understand our data set
### A. Timing related columns basic profiling

In [7]:
from pyspark.sql.functions import *

display(Markdown("<strong><em>Since the date received column is object type so the data type of the column "\
                 "date received is converted to timestamp, and we change the data type of Timely Response to"\
                 " Boolean</em></strong>"))

df = df.withColumn("dateRecvd",to_date(df.dateRecvd, 'MM/dd/yyyy'))
df = df.withColumn("timelyResponse",col("timelyResponse").cast(BooleanType()))

print("Checking for nulls on columns Date Received and Timely response?:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["dateRecvd","timelyResponse"]]).show()
print("Checking amount of distinct values in columns Date Received and Timely response?:")
df.select([countDistinct(c).alias(c) for c in ["dateRecvd","timelyResponse"]]).show()
print("Minimum Date Received:")
df.select(min("dateRecvd")).show()
print("Maximum Date Received:")
df.select(max("dateRecvd")).show()

<strong><em>Since the date received column is object type so the data type of the column date received is converted to timestamp, and we change the data type of Timely Response to Boolean</em></strong>

Checking for nulls on columns Date Received and Timely response?:
+---------+--------------+
|dateRecvd|timelyResponse|
+---------+--------------+
|        0|             0|
+---------+--------------+

Checking amount of distinct values in columns Date Received and Timely response?:
+---------+--------------+
|dateRecvd|timelyResponse|
+---------+--------------+
|     1958|             2|
+---------+--------------+

Minimum Date Received:
+--------------+
|min(dateRecvd)|
+--------------+
|    2011-12-01|
+--------------+

Maximum Date Received:
+--------------+
|max(dateRecvd)|
+--------------+
|    2017-04-10|
+--------------+



### B. Complaints related columns basic profiling

In [8]:
print("Checking for nulls on columns Product, Sub-Product, Issue, Sub-Issue:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Product","subProduct","Issue","subIssue"]]).show()
print("Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:")
df.select([countDistinct(c).alias(c) for c in ["Product","subProduct","Issue","subIssue"]]).show()

FreqProduct = df.groupBy("Product").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqIssue = df.groupBy("Issue").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqSubProduct = df.groupBy("subProduct").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqSubIssue = df.groupBy("subIssue").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())

print("Since Sub-Issue has only 1 distinct value we can drop that column from the dataframe:")
df = df.drop("subIssue")

print ("List of Products and Sub-Products (Categorical variable):")
FreqProduct = FreqProduct.withColumn("id", monotonically_increasing_id())
FreqSubProduct = FreqSubProduct.withColumn("id", monotonically_increasing_id())
top_DF = FreqProduct.join(FreqSubProduct, "id", "outer").drop("id")
top_DF.show(6,0)

display(Markdown("The most frequent issue is **%s** which occurs **%d times**." %(FreqIssue.first().Issue,FreqIssue.first().Total)))

print("Checking for nulls on columns Company, Tags and Submitted Via:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Company","Tags","submittedVia"]]).show()
print("Checking amount of distinct values in columns Company, Tags and Submitted Via:")
df.select([countDistinct(c).alias(c) for c in ["Company","Tags","submittedVia"]]).show()

FreqCompany = df.groupBy("Company").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqTags = df.groupBy("Tags").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqSubmittedVia = df.groupBy("submittedVia").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())

display(Markdown("The most frequent issue is reported with company: **%s** with **%d complaints**."\
                 %(FreqCompany.first().Company,FreqCompany.first().Total)))

FreqTags = FreqTags.withColumn("id", monotonically_increasing_id())
FreqSubmittedVia = FreqSubmittedVia.withColumn("id", monotonically_increasing_id())
top_DF = FreqTags.join(FreqSubmittedVia, "id", "outer").drop("id")
top_DF.show(7,0)

Checking for nulls on columns Product, Sub-Product, Issue, Sub-Issue:
+-------+----------+-----+--------+
|Product|subProduct|Issue|subIssue|
+-------+----------+-----+--------+
|      0|         0|    0|       0|
+-------+----------+-----+--------+

Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:
+-------+----------+-----+--------+
|Product|subProduct|Issue|subIssue|
+-------+----------+-----+--------+
|      2|         6|   38|       1|
+-------+----------+-----+--------+

Since Sub-Issue has only 1 distinct value we can drop that column from the dataframe:
List of Products and Sub-Products (Categorical variable):
+-----------------------+-----+----------------------------------+-----+
|Product                |Total|subProduct                        |Total|
+-----------------------+-----+----------------------------------+-----+
|Bank account or service|84811|Checking account                  |58197|
|Credit card            |87718|nan            

The most frequent issue is **Account opening, closing, or management** which occurs **37349 times**.

Checking for nulls on columns Company, Tags and Submitted Via:
+-------+----+------------+
|Company|Tags|submittedVia|
+-------+----+------------+
|      0|   0|           0|
+-------+----+------------+

Checking amount of distinct values in columns Company, Tags and Submitted Via:
+-------+----+------------+
|Company|Tags|submittedVia|
+-------+----+------------+
|    665|   4|           7|
+-------+----+------------+



The most frequent issue is reported with company: **Bank of America** with **22674 complaints**.

+-----------------------------+------+------------+------+
|Tags                         |Total |submittedVia|Total |
+-----------------------------+------+------------+------+
|Older American               |17348 |Referral    |42852 |
|nan                          |147639|Web         |100396|
|null                         |null  |Fax         |2033  |
|null                         |null  |Email       |110   |
|Older American, Servicemember|1927  |Postal mail |8923  |
|null                         |null  |nan         |1     |
|Servicemember                |5615  |Phone       |18214 |
+-----------------------------+------+------------+------+



### C. Resolution related columns basic profiling

In [9]:
print("Checking for nulls on columns Consumer Disputed?, Company Response, Company Public Response:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["consumerDisputed","companyResponse","companyPublicResponse"]]).show()
print("Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:")
df.select([countDistinct(c).alias(c) for c in ["consumerDisputed","companyResponse","companyPublicResponse"]]).show()

FreqConsumerDisputed = df.groupBy("consumerDisputed").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqCompanyResponse = df.groupBy("companyResponse").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqcompanyPublicResponse = df.groupBy("companyPublicResponse").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())

FreqConsumerDisputed = FreqConsumerDisputed.withColumn("id", monotonically_increasing_id())
FreqCompanyResponse = FreqCompanyResponse.withColumn("id", monotonically_increasing_id())
FreqcompanyPublicResponse = FreqcompanyPublicResponse.withColumn("id", monotonically_increasing_id())
top_DF = FreqConsumerDisputed.join(FreqCompanyResponse, "id", "outer").join(FreqcompanyPublicResponse,"id","outer").drop("id")
top_DF.show(truncate=True)

Checking for nulls on columns Consumer Disputed?, Company Response, Company Public Response:
+----------------+---------------+---------------------+
|consumerDisputed|companyResponse|companyPublicResponse|
+----------------+---------------+---------------------+
|               0|              0|                    0|
+----------------+---------------+---------------------+

Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:
+----------------+---------------+---------------------+
|consumerDisputed|companyResponse|companyPublicResponse|
+----------------+---------------+---------------------+
|               3|              8|                   11|
+----------------+---------------+---------------------+

+----------------+------+--------------------+------+---------------------+------+
|consumerDisputed| Total|     companyResponse| Total|companyPublicResponse| Total|
+----------------+------+--------------------+------+---------------------+------+


### D. Location related columns basic profiling

In [10]:
print("Checking for nulls on columns Consumer Disputed?, Company Response, Company Public Response:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["consumerConsent","State","zipCode"]]).show()
print("Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:")
df.select([countDistinct(c).alias(c) for c in ["consumerConsent","State","zipCode"]]).show()

FreqConsumerConsent = df.groupBy("consumerConsent").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc())
FreqConsumerConsent.show()
display(Markdown("The most frequent issue is reported in State: **%s** with **%d** occurances and at ZIP Code **%s** "\
                 "with **%d** occurances." %(df.groupBy("State").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc()).first().State,\
                                             df.groupBy("State").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc()).first().Total,\
                                             df.filter(col("zipCode")!="nan").groupBy("zipCode").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc()).first().zipCode,\
                                             df.filter(col("zipCode")!="nan").groupBy("zipCode").agg(count(lit(1)).alias("Total")).orderBy(col("Total").desc()).first().Total)))

Checking for nulls on columns Consumer Disputed?, Company Response, Company Public Response:
+---------------+-----+-------+
|consumerConsent|State|zipCode|
+---------------+-----+-------+
|              0|    0|      0|
+---------------+-----+-------+

Checking amount of distinct values in columns Product, Sub-Product, Issue, Sub-Issue:
+---------------+-----+-------+
|consumerConsent|State|zipCode|
+---------------+-----+-------+
|              5|   63|  19347|
+---------------+-----+-------+

+--------------------+------+
|     consumerConsent| Total|
+--------------------+------+
|                 nan|120594|
|    Consent provided| 31293|
|Consent not provided| 19571|
|               Other|  1069|
|   Consent withdrawn|     2|
+--------------------+------+



The most frequent issue is reported in State: **CA** with **24038** occurances and at ZIP Code **100XX** with **504** occurances.

# 5. Business Questions
### 1. Overall on-time complaint resolution rate

In [11]:
display(Markdown("""
| %s | %s | %s |
|----|----|----|
| %s | %s | %s |
| %s | %s | %s |
""" % ("Timely Response?", "Yes","No","**Count**", \
       "%d" % (df.filter(col("timelyResponse")==1).count()), \
       "%d" % (df.filter(col("timelyResponse")==0).count()),"**%**", \
       "%f" % (df.filter(col("timelyResponse")==1).count()/df.count()*100), \
       "%f" % (df.filter(col("timelyResponse")==0).count()/df.count()*100))))


| Timely Response? | Yes | No |
|----|----|----|
| **Count** | 169376 | 3153 |
| **%** | 98.172481 | 1.827519 |


### 2. Time Analysis

In [12]:
from itertools import chain
from pyspark.sql.window import Window

display(Markdown("**a. Number of complaints by Year**"))
df.withColumn("Year",year(df.dateRecvd)).groupBy("Year").agg(count(lit(1)).alias("Total"))\
    .orderBy(col("Total").desc()).show()

display(Markdown("**b. Company with most complaints received by Year**"))
groupDF = df.withColumn("Year",year(df.dateRecvd)).groupBy("Year","Company").agg(count(lit(1)).alias("Total"))\
            .select("Company","Year","Total").orderBy(col("Total").desc())
window = Window.partitionBy(groupDF['Year']).orderBy(groupDF['Total'].desc())
groupDF = groupDF.withColumn("rank",rank().over(window))
groupDF.select("Year","Company","Total")\
  .filter(col('rank') <= 1) \
  .sort(col("Year"))\
  .show()

display(Markdown("**c. Submission platform usage frequency by Year**<br/>"))

groupDF = df.withColumn("Year",year(df.dateRecvd)).filter(col("submittedVia")!="nan")\
            .groupBy("Year").pivot("submittedVia").count().show()

**a. Number of complaints by Year**

+----+-----+
|Year|Total|
+----+-----+
|2016|42916|
|2015|34442|
|2014|28638|
|2012|27579|
|2013|26495|
|2017|11199|
|2011| 1260|
+----+-----+



**b. Company with most complaints received by Year**

+----+---------------+-----+
|Year|        Company|Total|
+----+---------------+-----+
|2011|    Capital One|  237|
|2012|Bank of America| 4122|
|2013|Bank of America| 3693|
|2014|Bank of America| 3760|
|2015|Bank of America| 4570|
|2016|       Citibank| 6511|
|2017|       Citibank| 1452|
+----+---------------+-----+



**c. Submission platform usage frequency by Year**<br/>

+----+-----+---+-----+-----------+--------+-----+
|Year|Email|Fax|Phone|Postal mail|Referral|  Web|
+----+-----+---+-----+-----------+--------+-----+
|2015|    2|375| 3277|       1586|    7293|21909|
|2013|   58|418| 3566|       1829|    7555|13069|
|2014|    3|404| 3301|       1840|    6816|16274|
|2012|   40|241| 3416|       1237|   10405|12240|
|2016|    4|466| 3770|       1898|    8096|28682|
|2011|    3|  6|   76|         28|     557|  590|
|2017| null|123|  808|        505|    2130| 7632|
+----+-----+---+-----+-----------+--------+-----+



### 3. Issue Analysis

In [13]:
df.createOrReplaceTempView('df')

display(Markdown("**a. Top 20 - Most frequenty reported Issues**<br/>"))
spark.sql('SELECT Issue,Count(Issue) as Occurrence from df group by Issue order by count(Issue) desc').show(20,0)

display(Markdown("**b. Top 5 issues reported by Product**"))
groupDF = df.groupBy("Product","Issue").agg(count(lit(1)).alias("Total"))\
            .select("Product","Issue","Total").orderBy(col("Total").desc())
window = Window.partitionBy(groupDF['Product']).orderBy(groupDF['Total'].desc())
groupDF = groupDF.withColumn("rank",rank().over(window))
groupDF.select("Product","Issue","Total")\
  .filter(col('rank') <= 5) \
  .show(10,0)

display(Markdown("**c. Top 20 - Most frequenty reported Issues not resolved on time**<br/>"))
spark.sql("SELECT Issue,Count(Issue) as Occurrence from df where timelyResponse = 0 group by "\
            "Issue order by count(Issue) desc").show(20,0)

display(Markdown("**d. Top 20 - Issues disputed by customers**<br/>"))
spark.sql("SELECT Issue,Count(Issue) as Occurrence from df where consumerDisputed = 'Yes' group by "\
            "Issue order by count(Issue) desc").show(20,0)

**a. Top 20 - Most frequenty reported Issues**<br/>

+----------------------------------------+----------+
|Issue                                   |Occurrence|
+----------------------------------------+----------+
|Account opening, closing, or management |37349     |
|Deposits and withdrawals                |22491     |
|Billing disputes                        |14827     |
|Problems caused by my funds being low   |11688     |
|Other                                   |9144      |
|Identity theft / Fraud / Embezzlement   |8330      |
|Making/receiving payments, sending money|7266      |
|Closing/Cancelling account              |6291      |
|Using a debit or ATM card               |6017      |
|APR or interest rate                    |5463      |
|Late fee                                |3576      |
|Customer service / Customer relations   |3458      |
|Delinquent account                      |3155      |
|Credit determination                    |3013      |
|Advertising and marketing               |2894      |
|Rewards                    

**b. Top 5 issues reported by Product**

+-----------------------+----------------------------------------+-----+
|Product                |Issue                                   |Total|
+-----------------------+----------------------------------------+-----+
|Credit card            |Billing disputes                        |14827|
|Credit card            |Other                                   |9144 |
|Credit card            |Identity theft / Fraud / Embezzlement   |8330 |
|Credit card            |Closing/Cancelling account              |6291 |
|Credit card            |APR or interest rate                    |5463 |
|Bank account or service|Account opening, closing, or management |37349|
|Bank account or service|Deposits and withdrawals                |22491|
|Bank account or service|Problems caused by my funds being low   |11688|
|Bank account or service|Making/receiving payments, sending money|7266 |
|Bank account or service|Using a debit or ATM card               |6017 |
+-----------------------+--------------------------

**c. Top 20 - Most frequenty reported Issues not resolved on time**<br/>

+----------------------------------------+----------+
|Issue                                   |Occurrence|
+----------------------------------------+----------+
|Account opening, closing, or management |1011      |
|Deposits and withdrawals                |546       |
|Problems caused by my funds being low   |266       |
|Making/receiving payments, sending money|221       |
|Using a debit or ATM card               |141       |
|Billing disputes                        |139       |
|Identity theft / Fraud / Embezzlement   |127       |
|Other                                   |99        |
|Closing/Cancelling account              |70        |
|Unsolicited issuance of credit card     |60        |
|Credit card protection / Debt protection|48        |
|APR or interest rate                    |46        |
|Late fee                                |31        |
|Transaction issue                       |28        |
|Customer service / Customer relations   |28        |
|Delinquent account         

**d. Top 20 - Issues disputed by customers**<br/>

+----------------------------------------+----------+
|Issue                                   |Occurrence|
+----------------------------------------+----------+
|Account opening, closing, or management |7399      |
|Deposits and withdrawals                |4151      |
|Billing disputes                        |3228      |
|Other                                   |2055      |
|Problems caused by my funds being low   |1786      |
|Identity theft / Fraud / Embezzlement   |1553      |
|Making/receiving payments, sending money|1420      |
|Closing/Cancelling account              |1354      |
|Using a debit or ATM card               |1164      |
|APR or interest rate                    |1067      |
|Rewards                                 |744       |
|Credit determination                    |686       |
|Customer service / Customer relations   |685       |
|Advertising and marketing               |653       |
|Delinquent account                      |625       |
|Credit card protection / De

### 4. Predicting Number of complaints pending for 2017
Since our dataset is until April, 2017 we can use previous data to predict the expected number of complaints in 2017 and have an estimate as to how many more issues they can anticipate to allocate enough resources.<br/>
I tried to find the number of complaints to come using a linear regression model with Year as the dependant variable(y) and total issues reported every year as the independant variable(x)

In [14]:
df_ml = df.withColumn("Year",year(df.dateRecvd)).select('Year').groupBy('Year').count().orderBy(col("Year"))

In [15]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [16]:
assembler = VectorAssembler(inputCols=['Year'],outputCol='features')
output = assembler.transform(df_ml)
model_data = output.select('features','count')
train_data,test_data = model_data.limit(6),\
        model_data.withColumn("index", monotonically_increasing_id()).orderBy(desc("index")).drop("index").limit(1)
lr = LinearRegression(labelCol='count')
lr_model = lr.fit(train_data)
test_results = lr_model.evaluate(test_data)

In [17]:
unlabeled_data = test_data.select('features')
predictions = lr_model.transform(unlabeled_data)

In [18]:
predictions.select("features",round("prediction").alias("prediction")).show()
display(Markdown("We currently have number of issues occured already until 10-April-2017 which we can see below:"))
df_ml.filter(col("Year")=="2017").show()

+--------+----------+
|features|prediction|
+--------+----------+
|[2017.0]|   49990.0|
+--------+----------+



We currently have number of issues occured already until 10-April-2017 which we can see below:

+----+-----+
|Year|count|
+----+-----+
|2017|11199|
+----+-----+



In [19]:
display(Markdown("We can anticipate more **%d complaints** to be registered in rest of 2017." % (49990-11199)))

We can anticipate more **38791 complaints** to be registered in rest of 2017.

### Thank You!