In [1]:
from pyspark.sql import SparkSession

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [53]:
from pyspark.sql.functions import udf

In [5]:
spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [4]:
schema = StructType().add('user_id',"string").add("country","string").add("browser","string").add("OS","string").add("age","integer")

In [6]:
df_custom = spark.createDataFrame([("A203",'India',"Chrome","WIN",33),("A201",'China',"Safari","MacOS",35),("A205",'UK',"Mozilla","Linux",25)],schema = schema)

In [8]:
df_custom.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- age: integer (nullable = true)



In [9]:
df_custom.show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|  India| Chrome|  WIN| 33|
|   A201|  China| Safari|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [10]:
df_na=spark.createDataFrame([("A203",None,"Chrome","WIN",33),("A201",'China',None,"MacOS",35),("A205",'UK',"Mozilla","Linux",25)],schema=schema)

In [11]:
df_na.show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|   null| Chrome|  WIN| 33|
|   A201|  China|   null|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [12]:
df_na.fillna('0').show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|      0| Chrome|  WIN| 33|
|   A201|  China|      0|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [13]:
df_na.fillna({'country':'USA','browser':'Google Chrome'}).show()

+-------+-------+-------------+-----+---+
|user_id|country|      browser|   OS|age|
+-------+-------+-------------+-----+---+
|   A203|    USA|       Chrome|  WIN| 33|
|   A201|  China|Google Chrome|MacOS| 35|
|   A205|     UK|      Mozilla|Linux| 25|
+-------+-------+-------------+-----+---+



In [14]:
df_na.na.drop().show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [15]:
df_na.na.drop(subset='country').show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A201|  China|   null|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [16]:
df_na.replace("Chrome","Google Chrome").show()

+-------+-------+-------------+-----+---+
|user_id|country|      browser|   OS|age|
+-------+-------+-------------+-----+---+
|   A203|   null|Google Chrome|  WIN| 33|
|   A201|  China|         null|MacOS| 35|
|   A205|     UK|      Mozilla|Linux| 25|
+-------+-------+-------------+-----+---+



In [18]:
df_na.drop('user_id').show()

+-------+-------+-----+---+
|country|browser|   OS|age|
+-------+-------+-----+---+
|   null| Chrome|  WIN| 33|
|  China|   null|MacOS| 35|
|     UK|Mozilla|Linux| 25|
+-------+-------+-----+---+



In [20]:
df = spark.read.csv("customer_data.csv",header = True, inferSchema=True)

In [21]:
df.count()

2000

In [22]:
len(df.columns)

7

In [23]:
df.columns

['Customer_subtype',
 'Number_of_houses',
 'Avg_size_household',
 'Avg_age',
 'Customer_main_type',
 'Avg_Salary',
 'label']

In [24]:
df.printSchema()

root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)



In [25]:
df.show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 3 rows



In [26]:
df.summary().show()

+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|summary|    Customer_subtype|  Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|       Avg_Salary|             label|
+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|  count|                2000|              2000|              2000|       2000|                2000|             2000|              2000|
|   mean|                null|            1.1075|            2.6895|       null|                null|     1616908.0835|            0.0605|
| stddev|                null|0.3873225521186316|0.7914562220841646|       null|                null|6822647.757312146|0.2384705099001677|
|    min|Affluent senior a...|                 1|                 1|20-30 years|      Average Family|             1361|                 0|
|    25%|                nu

Subset: select, filter, where

In [27]:
df.select(['Customer_subtype','Avg_Salary']).show()

+--------------------+----------+
|    Customer_subtype|Avg_Salary|
+--------------------+----------+
|Lower class large...|     44905|
|Mixed small town ...|     37575|
|Mixed small town ...|     27915|
|Modern, complete ...|     19504|
|  Large family farms|     34943|
|    Young and rising|     13064|
|Large religious f...|     29090|
|Lower class large...|      6895|
|Lower class large...|     35497|
|     Family starters|     30800|
|       Stable family|     39157|
|Modern, complete ...|     40839|
|Lower class large...|     30008|
|        Mixed rurals|     37209|
|    Young and rising|     45361|
|Lower class large...|     45650|
|Traditional families|     18982|
|Mixed apartment d...|     30093|
|Young all america...|     27097|
|Low income catholics|     23511|
+--------------------+----------+
only showing top 20 rows



In [28]:
df.filter(df['Avg_Salary']>1000000).count()

128

In [30]:
df.columns

['Customer_subtype',
 'Number_of_houses',
 'Avg_size_household',
 'Avg_age',
 'Customer_main_type',
 'Avg_Salary',
 'label']

In [31]:
df.filter(df['Avg_Salary']>500000).filter(df['Number_of_houses']>2).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+



In [32]:
df.where((df['Avg_Salary']>500000)&(df['Number_of_houses']>2)).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+



Aggregate

In [33]:
df.groupBy('Customer_subtype').count().show()

+--------------------+-----+
|    Customer_subtype|count|
+--------------------+-----+
|Large family, emp...|   56|
|Religious elderly...|   47|
|Large religious f...|  107|
|Modern, complete ...|   93|
|    Village families|   68|
|Young all america...|   62|
|Young urban have-...|    4|
|Young seniors in ...|   22|
|Fresh masters in ...|    2|
|High Income, expe...|   52|
|Lower class large...|  288|
| Residential elderly|    6|
|Senior cosmopolitans|    1|
|        Mixed rurals|   67|
|Career and childcare|   33|
|Low income catholics|   72|
|Mixed apartment d...|   34|
|Seniors in apartm...|   17|
|Middle class fami...|  122|
|Traditional families|  129|
+--------------------+-----+
only showing top 20 rows



In [35]:
for col in df.columns:
    if col!='Avg_Salary':
        print(f" Aggregation for {col}")
        df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)

 Aggregation for Customer_subtype
+------------------------------------------+-----+
|Customer_subtype                          |count|
+------------------------------------------+-----+
|Lower class large families                |288  |
|Traditional families                      |129  |
|Middle class families                     |122  |
|Large religious families                  |107  |
|Modern, complete families                 |93   |
|Couples with teens 'Married with children'|83   |
|Young and rising                          |78   |
|High status seniors                       |76   |
|Low income catholics                      |72   |
|Mixed seniors                             |71   |
|Village families                          |68   |
|Mixed rurals                              |67   |
|Young all american family                 |62   |
|Stable family                             |62   |
|Young, low educated                       |56   |
|Large family, employed child              |56  

In [37]:
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show()

+--------------------+--------------------+
|  Customer_main_type|     avg(Avg_Salary)|
+--------------------+--------------------+
|             Farmers|  30209.333333333332|
|       Career Loners|             32272.6|
|Retired and Relig...|   27338.80693069307|
|Successful hedonists|1.6278923510309279E7|
|         Living well|  31194.044943820223|
|      Average Family|  104256.62337662338|
|    Cruising Seniors|  28870.333333333332|
|Conservative fami...|  29504.419491525423|
|      Driven Growers|   30769.04069767442|
|Family with grown...|  28114.191881918818|
+--------------------+--------------------+



In [42]:
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary')).orderBy('max(Avg_Salary)',ascending=False).show()

+--------------------+---------------+
|  Customer_main_type|max(Avg_Salary)|
+--------------------+---------------+
|Successful hedonists|       48919896|
|      Average Family|         991838|
|             Farmers|          49965|
|Conservative fami...|          49965|
|      Driven Growers|          49932|
|       Career Loners|          49903|
|Family with grown...|          49901|
|         Living well|          49816|
|Retired and Relig...|          49564|
|    Cruising Seniors|          49526|
+--------------------+---------------+



In [43]:
df.sort("Avg_Salary",ascending=False).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1|
|High Income, expe...|               1|                 3|40-50 years|Successful hedonists|  46911924|    0|
| High status seniors|               1|                 3|40-50 years|Successful hedonists|  46614009|    0|
|High Income, expe...|               1|                 3|30-40 years|Successful hedonists|  45952441|    0|
|High Income, expe.

df.groupBy('Customer_subtype').agg(F.avg('Avg_Salary').alias('mean_salary')).orderBy('mean_salary',ascending=False).show()

In [48]:
df.groupBy('Customer_subtype').agg(F.max('Avg_Salary').alias('max_salary')).orderBy('max_salary',ascending=False).show()

+--------------------+----------+
|    Customer_subtype|max_salary|
+--------------------+----------+
| High status seniors|  48919896|
|High Income, expe...|  48177970|
|Affluent senior a...|    994077|
|Affluent young fa...|    991838|
|  Large family farms|     49965|
|Traditional families|     49965|
|Middle class fami...|     49932|
|Senior cosmopolitans|     49903|
|Mixed small town ...|     49901|
|Lower class large...|     49899|
|       Mixed seniors|     49876|
|    Young and rising|     49816|
|        Mixed rurals|     49785|
|Modern, complete ...|     49729|
| Young, low educated|     49626|
|Mixed apartment d...|     49621|
|     Family starters|     49602|
|    Village families|     49575|
|Religious elderly...|     49564|
|       Stable family|     49548|
+--------------------+----------+
only showing top 20 rows



In [50]:
df.groupBy("Customer_subtype").agg(F.collect_set("Number_of_houses")).show()

+--------------------+-----------------------------+
|    Customer_subtype|collect_set(Number_of_houses)|
+--------------------+-----------------------------+
|Large family, emp...|                       [1, 2]|
|Religious elderly...|                       [1, 2]|
|Large religious f...|                       [1, 2]|
|Modern, complete ...|                       [1, 2]|
|    Village families|                       [1, 2]|
|Young all america...|                       [1, 2]|
|Young urban have-...|                       [1, 2]|
|Young seniors in ...|                    [1, 2, 3]|
|Fresh masters in ...|                          [1]|
|High Income, expe...|                          [1]|
|Lower class large...|                       [1, 2]|
| Residential elderly|                    [1, 2, 3]|
|Senior cosmopolitans|                          [3]|
|        Mixed rurals|                          [1]|
|Career and childcare|                       [1, 2]|
|Low income catholics|                        

In [52]:
df = df.withColumn('constant',F.lit('finance'))
df.select(['Customer_subtype','constant']).show()

+--------------------+--------+
|    Customer_subtype|constant|
+--------------------+--------+
|Lower class large...| finance|
|Mixed small town ...| finance|
|Mixed small town ...| finance|
|Modern, complete ...| finance|
|  Large family farms| finance|
|    Young and rising| finance|
|Large religious f...| finance|
|Lower class large...| finance|
|Lower class large...| finance|
|     Family starters| finance|
|       Stable family| finance|
|Modern, complete ...| finance|
|Lower class large...| finance|
|        Mixed rurals| finance|
|    Young and rising| finance|
|Lower class large...| finance|
|Traditional families| finance|
|Mixed apartment d...| finance|
|Young all america...| finance|
|Low income catholics| finance|
+--------------------+--------+
only showing top 20 rows



In [54]:
df.groupBy('Avg_age').count().show()

+-----------+-----+
|    Avg_age|count|
+-----------+-----+
|70-80 years|    8|
|50-60 years|  373|
|30-40 years|  496|
|20-30 years|   31|
|60-70 years|   64|
|40-50 years| 1028|
+-----------+-----+



In [55]:
def age_category(age):
    if age == "20-30 years":
        return "young"
    elif age=="30-40 years":
        return "Mid Aged"
    elif((age=="40-50 years") or (age == "50-60 years")):
        return "old"
    else:
        return "very old"

In [56]:
age_udf=udf(age_category,StringType())

In [57]:
df=df.withColumn('age_category',age_udf(df['Avg_age']))

In [58]:
df.select('Avg_age','age_category').show()

+-----------+------------+
|    Avg_age|age_category|
+-----------+------------+
|30-40 years|    Mid Aged|
|30-40 years|    Mid Aged|
|30-40 years|    Mid Aged|
|40-50 years|         old|
|30-40 years|    Mid Aged|
|20-30 years|       young|
|30-40 years|    Mid Aged|
|40-50 years|         old|
|50-60 years|         old|
|40-50 years|         old|
|40-50 years|         old|
|40-50 years|         old|
|40-50 years|         old|
|40-50 years|         old|
|30-40 years|    Mid Aged|
|40-50 years|         old|
|40-50 years|         old|
|40-50 years|         old|
|30-40 years|    Mid Aged|
|50-60 years|         old|
+-----------+------------+
only showing top 20 rows



In [60]:
age_udf()

Column<b'age_category()'>

3 types of pandas UDF: scalar, grouped map, grouped agg

In [61]:
df.select('Avg_Salary').summary().show()

+-------+-----------------+
|summary|       Avg_Salary|
+-------+-----------------+
|  count|             2000|
|   mean|     1616908.0835|
| stddev|6822647.757312146|
|    min|             1361|
|    25%|            20315|
|    50%|            31421|
|    75%|            42949|
|    max|         48919896|
+-------+-----------------+



In [62]:
min_sal=1361
max_sal = 48919896

In [63]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [64]:
def scaled_salary(salary):
    scaled_sal = (salary-min_sal)/(max_sal-min_sal)
    return scaled_sal

In [66]:
scaling_udf = pandas_udf(scaled_salary,DoubleType())

In [68]:
#df.withColumn("scaled_salary",scaling_udf(df['Avg_Salary'])).show(10,False)

merge

In [74]:
region_data = spark.createDataFrame([('Family with grown ups','PN'),('Driven Growers','GJ'),('Conservative families','DD'), \
                     ('Cruising Seniors','DL'),('Average Family ','MN'),('Living well','KA'),('Successful hedonists','JH'),('Retired and Religious','AX'),('Career Loners','HY'),('Farmers','JH')],schema=StructType().add("Customer_main_type","string").add("Region Code","string"))

In [75]:
region_data.show()

+--------------------+-----------+
|  Customer_main_type|Region Code|
+--------------------+-----------+
|Family with grown...|         PN|
|      Driven Growers|         GJ|
|Conservative fami...|         DD|
|    Cruising Seniors|         DL|
|     Average Family |         MN|
|         Living well|         KA|
|Successful hedonists|         JH|
|Retired and Relig...|         AX|
|       Career Loners|         HY|
|             Farmers|         JH|
+--------------------+-----------+



In [76]:
new_df=df.join(region_data,on='Customer_main_type')

In [77]:
new_df.groupBy("Region Code").count().show()

+-----------+-----+
|Region Code|count|
+-----------+-----+
|         JH|  287|
|         HY|   15|
|         DD|  236|
|         DL|   60|
|         GJ|  172|
|         PN|  542|
|         KA|  178|
|         AX|  202|
+-----------+-----+



pivot view

In [79]:
df.groupBy('Customer_main_type').pivot('Avg_age').sum('Avg_Salary').fillna(0).show()

+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|  Customer_main_type|20-30 years|30-40 years|40-50 years|50-60 years|60-70 years|70-80 years|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|             Farmers|          0|     462027|    2031235|     316206|          0|          0|
|       Career Loners|     143998|     176639|      25701|     105193|      32558|          0|
|Retired and Relig...|     126350|     336631|    2975266|    1687711|     335357|      61124|
|Successful hedonists|      42261|  171278764| 1223362814| 1563071675|  200340129|      15518|
|         Living well|     460528|    2965303|    1795405|     331304|          0|          0|
|      Average Family|          0|   23682805|    7789464|     412490|     226281|          0|
|    Cruising Seniors|          0|      43302|     303601|     529354|     716425|     139538|
|Conservative fami...|      69390|    2381485|    

In [81]:
df.groupBy('Customer_main_type').pivot('label').sum('Avg_Salary').fillna(0).show()

+--------------------+----------+---------+
|  Customer_main_type|         0|        1|
+--------------------+----------+---------+
|             Farmers|   2734832|    74636|
|       Career Loners|    484089|        0|
|Retired and Relig...|   5328410|   194029|
|Successful hedonists|2720381462|437729699|
|         Living well|   5453384|    99156|
|      Average Family|  26036999|  6074041|
|    Cruising Seniors|   1675841|    56379|
|Conservative fami...|   6595027|   368016|
|      Driven Growers|   4492465|   799810|
|Family with grown...|  14394094|   843798|
+--------------------+----------+---------+



window functions: Aggregations, rankings, Analytics

In [82]:
from pyspark.sql.window import Window

In [83]:
from pyspark.sql.functions import col, row_number

In [85]:
win = Window.orderBy(df['Avg_Salary'].desc())

In [90]:
df = df.withColumn('rank',row_number().over(win).alias('rank'))

In [91]:
df.show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0| finance|    very old|   1|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0| finance|         old|   2|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1| finance|         old|   3|
|High Income, expe...|               1|                 3|40-50 years|Successful hedonists|  46911924|    0| finance|         old|   4|
| High status seniors|               1|         

In [92]:
win_1 = Window.partitionBy("Customer_subtype").orderBy(df['Avg_Salary'].desc())

In [94]:
df = df.withColumn('rank',row_number().over(win_1).alias('rank'))

In [95]:
df.groupBy('rank').count().orderBy('rank').show()

+----+-----+
|rank|count|
+----+-----+
|   1|   39|
|   2|   37|
|   3|   36|
|   4|   36|
|   5|   34|
|   6|   34|
|   7|   32|
|   8|   31|
|   9|   31|
|  10|   31|
|  11|   31|
|  12|   31|
|  13|   31|
|  14|   31|
|  15|   31|
|  16|   30|
|  17|   30|
|  18|   27|
|  19|   27|
|  20|   27|
+----+-----+
only showing top 20 rows



In [96]:
df.filter(col('rank')<4).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|Large family, emp...|               2|                 3|30-40 years|Family with grown...|     49418|    0| finance|    Mid Aged|   1|
|Large family, emp...|               1|                 4|40-50 years|Family with grown...|     48390|    0| finance|         old|   2|
|Large family, emp...|               1|                 3|40-50 years|Family with grown...|     48272|    0| finance|         old|   3|
|Religious elderly...|               1|                 2|50-60 years|Retired and Relig...|     49564|    0| finance|         old|   1|
|Religious elderly...|               1|         

chapter - 3

s

In [98]:
sparkS=SparkSession.builder.appName('structured_streaming').getOrCreate()

In [99]:
df_1=sparkS.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [100]:
schema1= StructType().add("user_id","string").add("app","string").add("time_in_secs","integer").add("age","integer")

In [105]:
data = sparkS.readStream.option("sep",",").schema(schema1).csv("csv_folder")

In [106]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



In [107]:
app_count = data.groupBy('app').count()

In [110]:
query = (app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

In [111]:
sparkS.sql("select * from count_query").toPandas().head(5)

Unnamed: 0,app,count
0,Insta,1
1,FB,1
2,Twitter,1


In [112]:
fb_data= data.filter(data['app']=='FB')

In [113]:
fb_avg_time = fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))

In [114]:
fb_query = (fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())

In [115]:
sparkS.sql("select * from fb_query").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0


In [124]:
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [125]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0
1,XN201,10.0
2,XN202,2000.0


In [126]:
df_3=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [127]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0
1,XN201,10.0
2,XN202,2000.0


In [128]:
app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)

In [129]:
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())

IllegalArgumentException: 'Cannot start query with name app_wise_query as a query with that name is already active'

In [130]:
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,5020
1,Insta,530
2,Twitter,110


In [136]:
df_4=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [137]:
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,5520
1,Insta,560
2,Twitter,210


In [138]:
age_df=data.groupBy('app').agg(F.avg('age').alias('mean_age')).orderBy('mean_age',ascending=False)

In [139]:
age_query=(age_df.writeStream.queryName('age_query').outputMode('complete').format('memory').start())

In [140]:
df_5=spark.createDataFrame([("XN210",'FB',500,50),("XN255",'Insta',30,23),("XN222",'Twitter',100,30)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [143]:
spark.sql("select * from age_query ").toPandas().head(5)

Unnamed: 0,app,mean_age
0,Twitter,39.8
1,FB,30.727273
2,Insta,24.2


In [144]:
app_df=spark.createDataFrame([('FB','FACEBOOK'),('Insta','INSTAGRAM'),('Twitter','TWITTER')],["app", "full_name"])

In [145]:
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+

