# Spark session

In [116]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, countDistinct, count, last
from pyspark.sql.types import IntegerType, StructType, StructField, StringType, DateType
# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

In [88]:
table_schema = StructType([StructField('user_id', IntegerType()),
                     StructField('regionid', StringType()),
                     StructField('duration', IntegerType()),
                     StructField('channel', StringType()),
                     StructField('unique_sess_id', StringType()),
                    StructField('app_id', IntegerType()),
                    StructField('device', StringType()),
                    StructField('cats', StringType()),
                    StructField('email', StringType()),
                    StructField('backend_id', IntegerType())])

In [89]:
df = spark.read.format("csv").option("header", True).schema(table_schema).load("../data/hiring_task.csv")

In [91]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- regionid: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- unique_sess_id: string (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- cats: string (nullable = true)
 |-- email: string (nullable = true)
 |-- backend_id: integer (nullable = true)



In [5]:
df = df.withColumn('duration', df['duration'].cast(IntegerType()))

In [8]:
df = df.withColumn('backend_id', regexp_extract(col('email'), r'-([0-9]+)@zattoo.com', 1))

In [14]:
df.show(truncate=False, n=70)

+-------+--------+--------+---------------+--------------------------------+------+------+------------------+-------------------------+----------+
|user_id|regionid|duration|channel        |unique_sess_id                  |app_id|device|cats              |email                    |backend_id|
+-------+--------+--------+---------------+--------------------------------+------+------+------------------+-------------------------+----------+
|219132 |CN      |12      |HiQ_zdf        |15F7A7C14254AFA3-4422BBD0D068842|8     |ipad  |Serien            |15F7A7-5040036@zattoo.com|5040036   |
|219132 |CN      |8       |HiQ_zdf        |15F7A7C37B610D05-376C0D307DC977B|8     |ipad  |Series            |15F7A7-5040036@zattoo.com|5040036   |
|219132 |CN      |334     |HiQ_zdf        |15F7A7C37B610D05-376C0D307DC977B|8     |ipad  |Series            |15F7A7-5040036@zattoo.com|5040036   |
|219132 |CN      |384     |HiQ_sf1        |15F70EFF1E46491C-39DE290AC760B2E|8     |iphone|Series            |15F70E-50

In [35]:
df.groupBy('regionid').sum('duration').show()

                                                                                

+--------+-------------+
|regionid|sum(duration)|
+--------+-------------+
|      CN|    740498496|
|      AT|        58008|
|      PT|         3138|
|      CL|       570638|
|      AU|       143916|
|      GB|        12080|
|      ES|        90364|
|      US|        16546|
|      FR|       843548|
|      IT|        10748|
|      SK|        22554|
|      HU|        11240|
|      DK|           24|
+--------+-------------+





In [16]:
select('device', 'cats').where('regionid == "CN"').SHOW()

NameError: name 'select' is not defined

In [36]:
df.groupBy('device').sum('duration').show()

                                                                                

+-----------------+-------------+
|           device|sum(duration)|
+-----------------+-------------+
|           iphone|     29012480|
|          android|     43478454|
|        panasonic|       226344|
|          iPad2,1|      1432282|
|android_bigscreen|     62750592|
|   android_tablet|     13215236|
|     windows_xbox|       185236|
|        bigscreen|     30775728|
|  windows_desktop|     56893300|
|          iPad4,1|     12978942|
|              web|    256145006|
|             ipad|     65539254|
|          iPad1,1|       855878|
|          appletv|    142717424|
|            tizen|     26075144|
+-----------------+-------------+





In [37]:
df.groupBy('regionid').agg(countDistinct('unique_sess_id')).show()

                                                                                

+--------+---------------------+
|regionid|count(unique_sess_id)|
+--------+---------------------+
|      CN|               150501|
|      AT|                   45|
|      PT|                    2|
|      CL|                  184|
|      AU|                   40|
|      GB|                    5|
|      ES|                   47|
|      US|                    3|
|      FR|                  170|
|      IT|                    2|
|      SK|                   12|
|      HU|                    1|
|      DK|                    1|
+--------+---------------------+



In [38]:
df.groupBy('device').agg(countDistinct('unique_sess_id')).show()

                                                                                

+-----------------+---------------------+
|           device|count(unique_sess_id)|
+-----------------+---------------------+
|           iphone|                 6771|
|          android|                 8139|
|        panasonic|                  133|
|          iPad2,1|                  437|
|android_bigscreen|                12687|
|   android_tablet|                 2947|
|     windows_xbox|                   61|
|        bigscreen|                 6187|
|  windows_desktop|                 3797|
|          iPad4,1|                 2297|
|              web|                25067|
|             ipad|                32901|
|          iPad1,1|                  351|
|          appletv|                31144|
|            tizen|                18094|
+-----------------+---------------------+



In [39]:
df.groupBy('regionid').agg(count('unique_sess_id')).show()

+--------+---------------------+
|regionid|count(unique_sess_id)|
+--------+---------------------+
|      CN|               199374|
|      AT|                   56|
|      PT|                    2|
|      CL|                  247|
|      AU|                   47|
|      GB|                    6|
|      ES|                   52|
|      US|                    5|
|      FR|                  192|
|      IT|                    2|
|      SK|                   15|
|      HU|                    1|
|      DK|                    1|
+--------+---------------------+



In [40]:
df.groupBy('device').agg(count('unique_sess_id')).show()

+-----------------+---------------------+
|           device|count(unique_sess_id)|
+-----------------+---------------------+
|           iphone|                 8553|
|          android|                11130|
|        panasonic|                  152|
|          iPad2,1|                  552|
|android_bigscreen|                15794|
|   android_tablet|                 3892|
|     windows_xbox|                   78|
|        bigscreen|                 8041|
|  windows_desktop|                 7286|
|          iPad4,1|                 2945|
|              web|                40720|
|             ipad|                38826|
|          iPad1,1|                  428|
|          appletv|                41262|
|            tizen|                20341|
+-----------------+---------------------+



In [76]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, current_timestamp, to_timestamp, to_date

In [84]:
df = spark.createDataFrame(
    [("11/25/1991",), ("11/24/1991",), ("11/30/1991",)], 
    ['date_str']
)
df.show()

+----------+
|  date_str|
+----------+
|11/25/1991|
|11/24/1991|
|11/30/1991|
+----------+



In [78]:
df2 = df.select(
    'date_str', 
    from_unixtime(unix_timestamp('date_str', 'mm/dd/yyy')).alias('date')
)
df3 = df.select('date_str', to_timestamp(df.date_str, 'MM/dd/yyy').alias('date'))
df4 = df.select('date_str', to_date(df.date_str, 'MM/dd/yyy').alias('date'))

In [79]:
df2.show()

+----------+-------------------+
|  date_str|               date|
+----------+-------------------+
|11/25/1991|1991-01-25 00:11:00|
|11/24/1991|1991-01-24 00:11:00|
|11/30/1991|1991-01-30 00:11:00|
+----------+-------------------+



In [80]:
df3.show()

+----------+-------------------+
|  date_str|               date|
+----------+-------------------+
|11/25/1991|1991-11-25 00:00:00|
|11/24/1991|1991-11-24 00:00:00|
|11/30/1991|1991-11-30 00:00:00|
+----------+-------------------+



In [81]:
df4.show()

+----------+----------+
|  date_str|      date|
+----------+----------+
|11/25/1991|1991-11-25|
|11/24/1991|1991-11-24|
|11/30/1991|1991-11-30|
+----------+----------+



In [101]:
import datetime
data2 =[(datetime.datetime(2022, 3, 4, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 5, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 6, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 7, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 8, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 9, 15, 16, 3, 607621),),
 (datetime.datetime(2022, 3, 10, 15, 16, 3, 607621),)]

schema = StructType([ \
    StructField("date",DateType())
  ])
 
df_date = spark.createDataFrame(data=data2,schema=schema)

In [102]:
df_date.show()

+----------+
|      date|
+----------+
|2022-03-04|
|2022-03-05|
|2022-03-06|
|2022-03-07|
|2022-03-08|
|2022-03-09|
|2022-03-10|
+----------+



In [113]:
now =  datetime.datetime.now()
df_date.filter(df_date.date.between(now.date(),(now+datetime.timedelta(days=3)).date())).show()

+----------+
|      date|
+----------+
|2022-03-04|
|2022-03-05|
|2022-03-06|
|2022-03-07|
+----------+



In [115]:
now.date()

datetime.date(2022, 3, 4)

In [136]:
df_date['date']

Column<'date'>

In [149]:
df_date.tail(1)[0]['date']

datetime.date(2022, 3, 10)

In [150]:
df_date.show()

+----------+
|      date|
+----------+
|2022-03-04|
|2022-03-05|
|2022-03-06|
|2022-03-07|
|2022-03-08|
|2022-03-09|
|2022-03-10|
+----------+

