In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
adasampledf = spark.read.option("header", "true").option("inferSchema", "true").csv('app_log.csv').toDF("user_id","connection_type","Date_Time")

In [5]:
adasampledf.show()

+-------+---------------+--------------------+
|user_id|connection_type|           Date_Time|
+-------+---------------+--------------------+
|      1|           wifi|2018-09-08 00:23:...|
|      2|       CELLULAR|2018-09-08 00:23:...|
|      3|           wifi|2018-09-08 00:23:...|
|      1|       CELLULAR|2018-09-08 00:23:...|
|      2|           wifi|2018-09-09 00:23:...|
|      3|       CELLULAR|2018-09-10 00:23:...|
|      1|           wifi|2018-09-10 00:23:...|
|      1|       CELLULAR|2018-09-07 00:23:...|
|      2|       CELLULAR|2018-09-08 00:23:...|
+-------+---------------+--------------------+



In [6]:
adasample_replaced_1 = adasampledf.withColumn('connection_type', regexp_replace('connection_type','wifi','W'))
adasample_replaced_2 = adasample_replaced_1.withColumn('connection_type', regexp_replace('connection_type','CELLULAR','C'))

In [7]:
adasample_replaced_2.show()

+-------+---------------+--------------------+
|user_id|connection_type|           Date_Time|
+-------+---------------+--------------------+
|      1|              W|2018-09-08 00:23:...|
|      2|              C|2018-09-08 00:23:...|
|      3|              W|2018-09-08 00:23:...|
|      1|              C|2018-09-08 00:23:...|
|      2|              W|2018-09-09 00:23:...|
|      3|              C|2018-09-10 00:23:...|
|      1|              W|2018-09-10 00:23:...|
|      1|              C|2018-09-07 00:23:...|
|      2|              C|2018-09-08 00:23:...|
+-------+---------------+--------------------+



In [8]:
adasample_replaced_2.createOrReplaceTempView('adasample_tbl1')

In [9]:
query = '''select TO_DATE(CAST(UNIX_TIMESTAMP(Date_Time, 'MM/dd/yyyy') AS TIMESTAMP)) AS newdate from adasample_tbl1'''
dfj = spark.sql(query)

In [10]:
dfj.show()

+----------+
|   newdate|
+----------+
|2018-09-08|
|2018-09-08|
|2018-09-08|
|2018-09-08|
|2018-09-09|
|2018-09-10|
|2018-09-10|
|2018-09-07|
|2018-09-08|
+----------+



In [11]:
adasampledf1 = spark.read.option("header", "true").option("inferSchema", "true").csv('user.csv').toDF("user_id","user_name")

In [12]:
adasampledf1.show()

+-------+---------+
|user_id|user_name|
+-------+---------+
|      1|   joshua|
|      2|     john|
|      3|     paul|
|      1|   joshua|
|      2|       jo|
+-------+---------+



In [13]:
adasampledf1.createOrReplaceTempView('adasample_tbl2')

In [14]:
query = '''select * from adasample_tbl2'''
dfj = spark.sql(query)

In [15]:
dfj.show()

+-------+---------+
|user_id|user_name|
+-------+---------+
|      1|   joshua|
|      2|     john|
|      3|     paul|
|      1|   joshua|
|      2|       jo|
+-------+---------+



In [16]:
query = '''select
            b.user_name,
            a.user_id,
            TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP)) AS date,
            count(a.user_id) as count
            from adasample_tbl1 a join adasample_tbl2 b 
            on a.user_id = b.user_id 
            group by user_name,a.user_id,TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP)) '''

In [17]:
dfj = spark.sql(query)

In [18]:
dfj.show()

+---------+-------+----------+-----+
|user_name|user_id|      date|count|
+---------+-------+----------+-----+
|     paul|      3|2018-09-10|    1|
|   joshua|      1|2018-09-10|    2|
|       jo|      2|2018-09-08|    2|
|   joshua|      1|2018-09-07|    2|
|     paul|      3|2018-09-08|    1|
|     john|      2|2018-09-09|    1|
|     john|      2|2018-09-08|    2|
|       jo|      2|2018-09-09|    1|
|   joshua|      1|2018-09-08|    4|
+---------+-------+----------+-----+



In [19]:
query = '''select
            user_name,
            sum(visitcount) as visit_count,
            month(date) as dt_mn
            from(
            select
            b.user_name,
            a.user_id,
            TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP)) AS date,
            count(a.user_id) as visitcount
            from adasample_tbl1 a join adasample_tbl2 b 
            on a.user_id = b.user_id 
            group by user_name,a.user_id,TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP))) base group by user_name,month(date)'''

In [20]:
dfj = spark.sql(query)

In [21]:
dfj.show()

+---------+-----------+-----+
|user_name|visit_count|dt_mn|
+---------+-----------+-----+
|       jo|          3|    9|
|     john|          3|    9|
|     paul|          2|    9|
|   joshua|          8|    9|
+---------+-----------+-----+



In [22]:
query = '''select
            user_name,
            visit_count,
            case when visit_count >= '3' then 'H'
            when visit_count = '2' then 'M'
            when visit_count <= '1' then 'L'
            end as grade
            from(
            select
            user_name,
            sum(visitcount) as visit_count,
            month(date) as dt_mn
            from(
            select
            b.user_name,
            a.user_id,
            TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP)) AS date,
            count(a.user_id) as visitcount
            from adasample_tbl1 a join adasample_tbl2 b 
            on a.user_id = b.user_id 
            group by user_name,a.user_id,TO_DATE(CAST(UNIX_TIMESTAMP(a.Date_Time, 'MM/dd/yyyy') AS TIMESTAMP))) base group by user_name,month(date)) new'''

In [23]:
dfj = spark.sql(query)

In [24]:
dfj.show()

+---------+-----------+-----+
|user_name|visit_count|grade|
+---------+-----------+-----+
|       jo|          3|    H|
|     john|          3|    H|
|     paul|          2|    M|
|   joshua|          8|    H|
+---------+-----------+-----+



In [30]:
adasmpljson = spark.read.json('part-00001')

In [31]:
adasmpljson.show(5,False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+------------------------------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-------------------+-------------------+-------------------+--------+--------------------------------+-------------+--------+-------------+----+
|asn_exchange                                                                                                                                                                |brq|brq_carrier                   

In [32]:
adasmpljson.printSchema()

root
 |-- asn_exchange: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asn: string (nullable = true)
 |    |    |-- brq: long (nullable = true)
 |    |    |-- bundle: string (nullable = true)
 |    |    |-- connection_types: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- brq: long (nullable = true)
 |    |    |    |    |-- connection_type: string (nullable = true)
 |    |    |    |    |-- first_seen: string (nullable = true)
 |    |    |    |    |-- last_seen: string (nullable = true)
 |    |    |    |    |-- num_days: long (nullable = true)
 |    |    |-- exchange: string (nullable = true)
 |    |    |-- first_seen: string (nullable = true)
 |    |    |-- last_seen: string (nullable = true)
 |    |    |-- num_days: long (nullable = true)
 |-- brq: long (nullable = true)
 |-- brq_carrier: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- brq: long (nu

In [36]:
cols = ['device.ifa']
adasmpljson.select(cols).show(10,False)

+------------------------------------+
|ifa                                 |
+------------------------------------+
|35503e6e-ad86-4f2e-816b-621ae67d06a3|
|                                    |
|583b7a6a-582e-418d-8446-e15d23c09c4d|
|04f27685-4e42-4b7c-a839-f3958633a0f6|
|b57f1839-7f94-47be-bb01-dfc7f5497f7c|
|d85d7d66-d654-4225-8d42-a048ad90d96e|
|c508ff8c-8455-4e81-8aec-d6921b89fa41|
|b972f1fd-9edc-4e2f-8411-bc75819b5235|
|e9b14cab-616f-490f-b7d0-5363f45b569e|
|769e97d6-d6c2-40d7-b835-0127410e2400|
+------------------------------------+
only showing top 10 rows



In [40]:
adasmpljson.createOrReplaceTempView('jsonsmpltbl')

In [44]:
query = '''select distinct(length(device.ifa)) as ifale from jsonsmpltbl'''

In [45]:
jsonq = spark.sql(query)

In [46]:
jsonq.show()

+-----+
|ifale|
+-----+
|    0|
|   36|
+-----+



In [47]:
query = '''select count(1) from jsonsmpltbl where length(device.ifa) = 0 '''

In [48]:
jsonq1 = spark.sql(query)

In [49]:
jsonq1.show()

+--------+
|count(1)|
+--------+
|      52|
+--------+



In [52]:
query = '''select count(1) from jsonsmpltbl '''
jsonq2 = spark.sql(query)

In [53]:
jsonq2.show()

+--------+
|count(1)|
+--------+
|    5027|
+--------+



In [58]:
adasmpljson_fix = adasmpljson.filter("length(device.ifa) != 0 ")

In [59]:
cols = ['device.ifa']
adasmpljson_fix.select(cols).show(10,False)

+------------------------------------+
|ifa                                 |
+------------------------------------+
|35503e6e-ad86-4f2e-816b-621ae67d06a3|
|583b7a6a-582e-418d-8446-e15d23c09c4d|
|04f27685-4e42-4b7c-a839-f3958633a0f6|
|b57f1839-7f94-47be-bb01-dfc7f5497f7c|
|d85d7d66-d654-4225-8d42-a048ad90d96e|
|c508ff8c-8455-4e81-8aec-d6921b89fa41|
|b972f1fd-9edc-4e2f-8411-bc75819b5235|
|e9b14cab-616f-490f-b7d0-5363f45b569e|
|769e97d6-d6c2-40d7-b835-0127410e2400|
|e957cf62-112f-4586-ae55-cb7563514824|
+------------------------------------+
only showing top 10 rows



In [65]:
adasmpljson_fix.createOrReplaceTempView('jsonsmpltbl_fix')

In [66]:
query = '''select count(1) from jsonsmpltbl_fix '''
jsonq3 = spark.sql(query)

In [67]:
jsonq3.show()

+--------+
|count(1)|
+--------+
|    4975|
+--------+



In [70]:
query = '''select brq_carrier.carrier,count(device.ifa) from jsonsmpltbl group by brq_carrier.carrier '''
jsonq4 = spark.sql(query)

In [71]:
jsonq4.show()

+--------------------+--------------------------+
|             carrier|count(device.ifa AS `ifa`)|
+--------------------+--------------------------+
|[502-16, u mobile...|                         1|
|            [214-01]|                         1|
|           [502-152]|                         6|
|    [502-19, 502-16]|                         1|
|    [502-19, 502-18]|                         1|
|            [502-12]|                      1033|
|[502-18, digi, 50...|                         1|
|[digi, celcom, 50...|                         1|
|            [450-08]|                         1|
|            [502-19]|                       975|
|            [510-10]|                         3|
|    [502-16, 525-03]|                         1|
|          [u mobile]|                        97|
|            [454-04]|                         1|
|            [525-05]|                         5|
|    [502-12, 502-16]|                         2|
|    [502-16, 502-12]|                         3|
