In [1]:
from pyspark.sql.functions import *

In [2]:
spark = SparkSession \
        .builder \
        .appName("TopQueries") \
        .getOrCreate()

In [3]:
spark

In [4]:
###  Read data form local filesystem

In [5]:
INPUT_PATH = '../data/w3data/search-click-logs/2019*.avro' # Input path with REGEX to use the avro files
OUTPUT_PATH = '../data/w3data/search-click-logs/output' # Output folder name to store generated reports

### Read data from stores with given format

#### Many Supported Data Sources "CSV", "JSON", "parquet", "avro", "JDBC/ODBC", "Plain text", "HDFS", "Cassandra", "HBase", "MongoDb" 
            

In [6]:
metrics_df = spark.read.format("avro").load(INPUT_PATH)

In [7]:
metrics_df.printSchema()

root
 |-- detectedDuplicate: boolean (nullable = true)
 |-- detectedCorruption: boolean (nullable = true)
 |-- firstInSession: boolean (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- remoteHost: string (nullable = true)
 |-- referer: string (nullable = true)
 |-- location: string (nullable = true)
 |-- viewportPixelWidth: integer (nullable = true)
 |-- viewportPixelHeight: integer (nullable = true)
 |-- screenPixelWidth: integer (nullable = true)
 |-- screenPixelHeight: integer (nullable = true)
 |-- partyId: string (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- pageViewId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- userAgentString: string (nullable = true)
 |-- userAgentName: string (nullable = true)
 |-- userAgentFamily: string (nullable = true)
 |-- userAgentVendor: string (nullable = true)
 |-- userAgentType: string (nullable = true)
 |-- userAgentVersion: string (nullable = true)
 |-- userAgentDeviceCategory: string (nulla

In [9]:
metrics_df.select("ibmQuery", "url", "title", "ibmEvType").show()

+--------------+--------------------+--------------------+----------+
|      ibmQuery|                 url|               title| ibmEvType|
+--------------+--------------------+--------------------+----------+
|  kyle freeman|https://w3.ibm.co...|FREEMAN, KYLE W (...|Dwell Time|
|  what is BAIW|https://w3-connec...|Tech Support Appl...|Impression|
|IBM counseling|https://w3-connec...|LA Legal Counsels...|Impression|
|  what is BAIW|https://w3-connec...|STATS (BAIW) Refe...|Impression|
|IBM counseling|https://d01db034....|MX-GS-0345 Rev. 1...|Impression|
|  what is BAIW|https://w3-connec...|Creating user id ...|Impression|
|          null|                null|                null|      null|
|  what is BAIW|https://w3.ibm.co...|CEDP BAIW *APPLIC...|Impression|
|    John petri|                null|                null|     Query|
|    John petri|https://w3.ibm.co...|Petri, John E (John)|Impression|
|  what is BAIW|https://w3.ibm.co...|CEDP BAIW ENTERCO...|Impression|
|  recall email|    

In [10]:
### TOP Queries ###

In [11]:
top_queries_df = metrics_df.select("ibmQuery", "ibmEvType") \
        .where(col("ibmQuery").isNotNull()) \
        .where (col("ibmEvType") == "Query") \
        .groupBy("ibmQuery") \
        .count() \
        .withColumnRenamed('count', 'total')\
        .orderBy("total", ascending=False)

In [12]:
top_queries_df.show()

+--------------------+-----+
|            ibmQuery|total|
+--------------------+-----+
|             workday|   11|
|Client executive ...|   11|
|          checkpoint|   10|
|                 box|    9|
|             buy@ibm|    8|
|   service apartment|    7|
|            itsc 300|    7|
|              murali|    7|
|Trans Tree Corpor...|    6|
|tuition reimbursment|    6|
|                bond|    5|
|red hat enterpris...|    5|
|                 pmr|    5|
|               webex|    5|
|time off service ...|    5|
|   field engineering|    5|
|           bluepages|    5|
|    retiree benefits|    5|
|                issi|    5|
|    Offering manager|    5|
+--------------------+-----+
only showing top 20 rows



### Spark SQL

In [13]:
metrics_df.registerTempTable("metrics")

In [14]:
sql_df = spark.sql("SELECT ibmQuery, count(*) as total "
           "FROM metrics "
           "WHERE ibmQuery is NOT null AND ibmEvType == 'Query'"
           "GROUP BY ibmQuery ORDER BY total desc ")

In [15]:
sql_df.show()

+--------------------+-----+
|            ibmQuery|total|
+--------------------+-----+
|             workday|   11|
|Client executive ...|   11|
|          checkpoint|   10|
|                 box|    9|
|             buy@ibm|    8|
|              murali|    7|
|   service apartment|    7|
|            itsc 300|    7|
|Trans Tree Corpor...|    6|
|tuition reimbursment|    6|
|                bond|    5|
|               webex|    5|
| career conversation|    5|
|    Offering manager|    5|
|   field engineering|    5|
|red hat enterpris...|    5|
|           bluepages|    5|
|    retiree benefits|    5|
|time off service ...|    5|
|                issi|    5|
+--------------------+-----+
only showing top 20 rows



### Logical and Physical Plans

In [16]:
top_queries_df.explain(extended=True)

== Parsed Logical Plan ==
'Sort ['total DESC NULLS LAST], true
+- Project [ibmQuery#33, count#141L AS total#144L]
   +- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS count#141L]
      +- Filter (ibmEvType#31 = Query)
         +- Filter isnotnull(ibmQuery#33)
            +- Project [ibmQuery#33, ibmEvType#31]
               +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro

== Analyzed Logical Plan ==
ibmQuery: string, total: bigint
Sort [total#144L DESC NULLS LAST], true
+- Project [ibmQuery#33, count#141L AS total#144L]
   +- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS coun

In [16]:
sql_df.explain(extended=True)

== Parsed Logical Plan ==
'Sort ['total DESC NULLS LAST], true
+- 'Aggregate ['ibmQuery], ['ibmQuery, 'count(1) AS total#137]
   +- 'Filter (isnotnull('ibmQuery) && ('ibmEvType = Query))
      +- 'UnresolvedRelation `metrics`

== Analyzed Logical Plan ==
ibmQuery: string, total: bigint
Sort [total#137L DESC NULLS LAST], true
+- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS total#137L]
   +- Filter (isnotnull(ibmQuery#33) && (ibmEvType#31 = Query))
      +- SubqueryAlias `metrics`
         +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro

== Optimized Logical Plan ==
Sort [total#13

### Partitions, Repartitions, and coalesce

In [17]:
metrics_df.rdd.getNumPartitions()

4

In [18]:
top_queries_df.rdd.getNumPartitions()

7

In [19]:
top_queries_df.repartition(50).rdd.getNumPartitions()

50

In [20]:
top_queries_df.coalesce(2).rdd.getNumPartitions()

2

In [21]:
top_queries_df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("Overwrite") \
    .option("ignoreLeadingWhiteSpace", "false")\
    .option("ignoreTrailingWhiteSpace", "false")\
    .save(OUTPUT_PATH+"/top-queries")