# Assignments

## Assignment - 1

In [1]:
import findspark

findspark.init()

In [2]:
import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import  *


In [18]:
spark = SparkSession.builder.master("local").appName("SparkApp").getOrCreate()
#.config("spark.jars", "/usr/share/java/mysql-connector-java-8.0.22.jar")\


# Weather Data Analysis with DataFrame API

## a) Load Weather Dataset and create DataFrame
### *The data is seperated by one or more spaces. So first transform the data and do the calculations*

In [20]:
path  = "D:/futurense_hadoop-pyspark/labs/dataset/weather/weather_data.txt"

In [21]:
columns = ['WBANNO',
 'LST_DATE',
 'CRX_VN',
 'LONGITUDE',
 'LATITUDE',
 'T_DAILY_MAX',
 'T_DAILY_MIN',
 'T_DAILY_MEAN',
 'T_DAILY_AVG',
 'P_DAILY_CALC',
 'SOLARAD_DAILY',
 'SUR_TEMP_DAILY_TYPE',
 'SUR_TEMP_DAILY_MAX',
 'SUR_TEMP_DAILY_MIN',
 'SUR_TEMP_DAILY_AVG',
 'RH_DAILY_MAX',
 'RH_DAILY_MIN',
 'RH_DAILY_AVG',
 'SOIL_MOISTURE_5_DAILY',
 'SOIL_MOISTURE_10_DAILY',
 'SOIL_MOISTURE_20_DAILY',
 'SOIL_MOISTURE_50_DAILY',
 'SOIL_MOISTURE_100_DAILY',
 'SOIL_TEMP_5_DAILY',
 'SOIL_TEMP_10_DAILY',
 'SOIL_TEMP_20_DAILY',
 'SOIL_TEMP_50_DAILY',
 'SOIL_TEMP_100_DAILY']


### *First read the data remove extra spaces and make it as comma seperated values*

In [24]:
data = spark.read.load(path,format="csv")

#df = data.withColumn("values", regexp_replace(data.values, "\s+", ",")).withColumn("values",split(data.value,","))

df = data.withColumn("_c0",regexp_replace(data._c0,"\s+",","))

df.show()

+--------------------+
|                 _c0|
+--------------------+
|23907,20150101,2....|
|23907,20150102,2....|
|23907,20150103,2....|
|23907,20150104,2....|
|23907,20150105,2....|
|23907,20150106,2....|
|23907,20150107,2....|
|23907,20150108,2....|
|23907,20150109,2....|
|23907,20150110,2....|
|23907,20150111,2....|
|23907,20150112,2....|
|23907,20150113,2....|
|23907,20150114,2....|
|23907,20150115,2....|
|23907,20150116,2....|
|23907,20150117,2....|
|23907,20150118,2....|
|23907,20150119,2....|
|23907,20150120,2....|
+--------------------+
only showing top 20 rows



### *Split the data based on ',' and store values in respective columns*

In [25]:

split_data = df.select(split(df._c0, ",").alias("data"))


converted_data = split_data.select(
    *[split_data.data.getItem(idx).alias(columns[idx]) for idx in range(len(columns))])
    



## b) Show Min and Max Temperature

In [26]:
#select the required the columns and type cast them into required datatype     
converted_data.select(
                        max(col("T_DAILY_MAX").cast(FloatType())).alias("T_DAILY_MAX"),
                        min(col("T_DAILY_MIN").cast(FloatType())).alias("T_DAILY_MIN")
                      ).show()


+-----------+-----------+
|T_DAILY_MAX|T_DAILY_MIN|
+-----------+-----------+
|       36.0|       -7.9|
+-----------+-----------+



## c) Show month wise Min and Max Temperature

In [27]:
months = {'01':'Jan','02':'Feb','03':'Mar','04':'Apr','05':'May','06':'Jun',
          '07':'Jul','08':'Aug','09':'Sep','10':'Oct','11':'Nov','12':'Dec'}

def find_month_name(month_num):
    return months[month_num]

month_df = udf(find_month_name,StringType())


df = converted_data.withColumn('MONTH',month_df(substring('LST_DATE',5,2)))\
                   .select(
                            col('MONTH'),col("T_DAILY_MAX").cast(FloatType()).alias("T_DAILY_MAX"),
                            col("T_DAILY_MIN").cast(FloatType()).alias("T_DAILY_MIN")
                          )

df.groupBy('MONTH').agg(min('T_DAILY_MIN').alias('Min_temp'),max('T_DAILY_MAX').alias('Max_temp')).show()

+-----+--------+--------+
|MONTH|Min_temp|Max_temp|
+-----+--------+--------+
|  May|    14.3|    31.1|
|  Jun|     0.0|    33.6|
|  Feb|    -3.5|    26.6|
|  Mar|    -3.2|    29.1|
|  Jan|    -7.9|    26.5|
|  Apr|     8.0|    30.8|
|  Jul|    19.8|    36.0|
+-----+--------+--------+



# Assignment (Optional)
## Find the total count of ratings for each type of rating

In [28]:
path = r'C:/Users/rakes/OneDrive/Desktop/ml-latest-small/ratings.csv'

In [29]:
ratings_df = spark.read.csv(path,header=True)

ratings_df.groupBy('rating').count().show()

+------+-----+
|rating|count|
+------+-----+
|   1.0| 2811|
|   4.5| 8551|
|   2.5| 5550|
|   3.5|13136|
|   5.0|13211|
|   0.5| 1370|
|   4.0|26818|
|   1.5| 1791|
|   2.0| 7551|
|   3.0|20047|
+------+-----+



# Assignment-2

## Bank Marketing Campaign Data Analysis with DataFrame API

## a) Load Bank Marketing Dataset and create DataFrame

In [30]:
path = 'D:/futurense_hadoop-pyspark/labs/dataset/bankmarket/bankmarketdata.csv'
bank_df = spark.read.csv(path,header=True,sep=';',inferSchema=True)
bank_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



## b.)	Give marketing success rate. (No. of people subscribed / total no. of entries)

In [31]:
total_count = bank_df.count()

success_rate = bank_df.filter(bank_df['y'] == 'yes').count() / total_count * 100

print("Success rate:",success_rate)


Success rate: 11.698480458295547


In [15]:
bank_df.createOrReplaceTempView('bank')

spark.sql("select sum(if(y='yes',1,0))/count(*) * 100 as success_rate from bank").show()

+------------------+
|      success_rate|
+------------------+
|11.698480458295547|
+------------------+



   ## c) Give marketing failure rate

In [32]:
failure_rate = bank_df.filter(bank_df.y == 'no').count() / total_count * 100

print("Failure rate:",failure_rate)

Failure rate: 88.30151954170445


In [33]:
spark.sql('select count(y)/(select count(*) from bank) * 100 as failure_rate from bank where y="no"').show()

+-----------------+
|     failure_rate|
+-----------------+
|88.30151954170445|
+-----------------+



## d) Maximum, Mean, and Minimum age of the average targeted customer

In [34]:
bank_df.select('age').agg(min(bank_df.age).alias('Min_Age'),
                          max(bank_df.age).alias('Max_Age'),
                          round(avg(bank_df.age),2).alias('Avg_age')).show()

+-------+-------+-------+
|Min_Age|Max_Age|Avg_age|
+-------+-------+-------+
|     18|     95|  40.94|
+-------+-------+-------+



In [35]:
spark.sql("select min(age) as min_age,max(age) as max_age, round(avg(age),1) as avg_age from bank").show()

+-------+-------+-------+
|min_age|max_age|avg_age|
+-------+-------+-------+
|     18|     95|   40.9|
+-------+-------+-------+



## e.)	Check the quality of customers by checking the average balance, median balance of customers

In [26]:
bank_df.agg(
            round(avg(bank_df.balance),2).alias("Avg_balance"),
            percentile_approx(bank_df.balance,0.5).alias("Median_balance")
            ).show()

+-----------+--------------+
|Avg_balance|Median_balance|
+-----------+--------------+
|    1362.27|           448|
+-----------+--------------+



In [27]:
spark.sql("select round(avg(balance),2) as avg_balance, percentile_approx(balance,0.5) as median_balance from bank").show()

+-----------+--------------+
|avg_balance|median_balance|
+-----------+--------------+
|    1362.27|           448|
+-----------+--------------+



## f.)	Check if age matters in marketing subscription for deposit

In [36]:
bank_df.filter(col('y') == 'yes').select('age').withColumn('Age_group', when(col('age') <= 20 ,'0-20')\
                                            .when((col('age') > 20) & (col('age') <= 30) , '21-30')\
                                             .when((col('age') > 30) & (col('age') <= 40) , '31-40')\
                                             .when((col('age') > 40) & (col('age') <= 50) , '41-50')\
                                             .when((col('age') > 50) & (col('age') <= 60) , '51-60')\
                                            .otherwise('61+')\
                                ).groupBy('Age_group').count().show()




+---------+-----+
|Age_group|count|
+---------+-----+
|      61+|  502|
|     0-20|   33|
|    21-30| 1112|
|    41-50| 1019|
|    31-40| 1812|
|    51-60|  811|
+---------+-----+



In [68]:

bank_df.createOrReplaceTempView('bank')


spark.sql("select age_group, count(age) as count from \
          (select age,case when age<=20 then '0-20' \
                when (age>=21 and age<=30) then '21-30'\
                when (age>=31 and age<=40) then '31-40' \
                when (age>=41 and age<=50) then '41-50' \
                when (age>=51 and age<=60) then '51-60' \
                else '60+' end as age_group \
               from bank where y = 'yes') a group by age_group").show()


+---------+-----+
|age_group|count|
+---------+-----+
|     0-20|   33|
|      60+|  502|
|    21-30| 1112|
|    41-50| 1019|
|    31-40| 1812|
|    51-60|  811|
+---------+-----+



## g) Show AgeGroup [Teenagers, Youngsters, MiddleAgers, Seniors] wise Subscription Count.

In [42]:
grouped_df = bank_df.filter(col('y')=='yes').select('age')\
       .withColumn('Age_group',when(col('age') <= 20,'Teenagers')
                               .when((col('age') >= 21) & (col('age') <= 40) ,'Youngsters')
                               .when((col('age') >= 41) & (col('age') <= 60) ,'MiddleAgers')
                               .otherwise('Seniors')
                  )

grouped_df.groupBy('Age_group').count().show()

+-----------+-----+
|  Age_group|count|
+-----------+-----+
| Youngsters| 2924|
|    Seniors|  502|
|  Teenagers|   33|
|MiddleAgers| 1830|
+-----------+-----+



In [70]:
spark.sql("select age_group, count(age) as count from (select age, case when age<=20 then 'Teenagers'\
                            when (age >= 21 and age <= 40) then 'Youngsters'\
                            when (age >= 41 and age <=60) then 'MiddleAgers'\
                            else 'Seniors' end as age_group from bank where y = 'yes') a group by age_group").show()

+-----------+-----+
|  age_group|count|
+-----------+-----+
| Youngsters| 2924|
|    Seniors|  502|
|  Teenagers|   33|
|MiddleAgers| 1830|
+-----------+-----+



## h) Check if marital status mattered for subscription to deposit.

In [43]:
bank_df.select('marital','y').filter(bank_df.y == 'yes').groupBy('marital').count().show()                       

+--------+-----+
| marital|count|
+--------+-----+
|divorced|  622|
| married| 2755|
|  single| 1912|
+--------+-----+



In [72]:
spark.sql("select marital, count(y) as count from bank where y='yes' group by marital").show()

+--------+-----+
| marital|count|
+--------+-----+
|divorced|  622|
| married| 2755|
|  single| 1912|
+--------+-----+



## i) Check if age and marital status together mattered for subscription to deposit scheme

In [46]:
bank_df.select('age','marital','y')\
        .filter(bank_df.y == 'yes').groupBy('age','marital')\
        .count().orderBy('age').show()  

+---+--------+-----+
|age| marital|count|
+---+--------+-----+
| 18|  single|    7|
| 19|  single|   11|
| 20|  single|   14|
| 20| married|    1|
| 21|  single|   21|
| 21| married|    1|
| 22|  single|   40|
| 23| married|    2|
| 23|  single|   42|
| 24|  single|   58|
| 24| married|   10|
| 25| married|   14|
| 25|  single|   99|
| 26| married|   13|
| 26|  single|  121|
| 27| married|   29|
| 27|  single|  110|
| 27|divorced|    2|
| 28| married|   20|
| 28|  single|  138|
+---+--------+-----+
only showing top 20 rows



In [73]:
spark.sql("select age,marital,count(y) as count from bank where y='yes' group by age,marital").show()

+---+--------+-----+
|age| marital|count|
+---+--------+-----+
| 42|  single|   22|
| 66| married|   22|
| 68|  single|    2|
| 28| married|   20|
| 59| married|   66|
| 61| married|   47|
| 21|  single|   21|
| 29|  single|  133|
| 70|divorced|    5|
| 56|  single|    6|
| 74| married|   11|
| 64|divorced|    4|
| 40|  single|   31|
| 45|  single|   14|
| 83|  single|    1|
| 69| married|   13|
| 77| married|   19|
| 57|divorced|   15|
| 21| married|    1|
| 53| married|   60|
+---+--------+-----+
only showing top 20 rows

