<link rel='stylesheet' href='../assets/css/main.css'/>

# Large Joins

## Overview

We will join two large datasets.

Find out what percentage of  transactions requests get response under 50 milliseconds.

- **request_time** is the `timestmap` in the request row
- **response_time** is the `timestamp` in the response row (here `ref_id` and `response_code` will not be null)

Find the elapsed time between these two.

## Duration

30 mins

## Depends on

[Lab 9.1](9-1_join-1.ipynb)


## Step-1: Verify datsets

We will join transaction data with itself.

- transactions data (large data).  Sample data is in `data/transactions/transactions-sample.csv`

Also optionally, verify you have this data in HDFS.


## Step-2: Start up Spark

In [None]:
try:
    spark
except NameError:
    import findspark
    findspark.init()  # uses SPARK_HOME
    print("Spark found in : ", findspark.find())

    import pyspark
    from pyspark import SparkConf
    from pyspark.sql import SparkSession

    # use a unique tmep dir for warehouse dir, so we can run multiple spark sessions in one dir
    import tempfile
    tmpdir = tempfile.TemporaryDirectory()

    config = ( SparkConf()
             .setAppName("TestApp")
             .setMaster("local[*]")
             .set('executor.memory', '2g')
             .set('spark.sql.warehouse.dir', tmpdir.name)
             .set("some_property", "some_value") # another example
             )

    spark = SparkSession.builder.config(conf=config).getOrCreate()
    sc = spark.sparkContext

print('Spark UI running on port ' + spark.sparkContext.uiWebUrl.split(':')[2])

## Step-3: Load data and register table

We are going to provide a schema so we can process the timestamp properly

In [None]:
## TODO : adjust the limit 
## Keep it small for experimenting, switch to 'ALL' to load the full dataset

limit_rows = 1000
# limit_rows = 'ALL'

In [None]:
import pyspark
from pyspark.sql.types import ArrayType, IntegerType, LongType, StringType, FloatType, TimestampType, StructType, StructField


my_schema = StructType([
                       StructField("id", StringType(), True),
                       StructField("timestamp", TimestampType(), True),
                       StructField("mti", StringType(), True),
                       StructField("card_number", StringType(), True),
                       StructField("amount_customer", FloatType(), True),
                       StructField("merchant_type", StringType(), True),
                       StructField("merchant_id", StringType(), True),
                       StructField("merchant_address", StringType(), True),
                       StructField("ref_id", StringType(), True),
                       StructField("amount_merchant", FloatType(), True),
                       StructField("response_code", StringType(), True),
                      ])

transactions = spark.read.csv("../data/transactions/csv", header=True, schema=my_schema)
transactions.limit(limit_rows).createOrReplaceTempView("transactions")
transactions.limit(limit_rows).createOrReplaceTempView("transactions2")

transactions.printSchema()

# transactions.show()

## Step-4: Large x Large Join



In [None]:
import pandas as pd

s = """
SELECT transactions.timestamp as request_time, 
transactions2.timestamp as response_time 
from transactions join transactions2 
ON (transactions.id = transactions2.ref_id)
"""

spark.sql(s).show(10, truncate=False)
joined = spark.sql(s)

## Step-5: Calculate Columns

We will calculate request / response timestamps

In [None]:
from pyspark.sql import functions as F

joined = (joined.withColumn("t1", joined["request_time"].cast("double")))
joined = (joined.withColumn("t2", joined["response_time"].cast("double")))
joined = joined.withColumn('elapsed', (joined['t2'] - joined['t1']))
joined.show(truncate=False)

## Step-6: Run Query

In [None]:
# find records that came in under 5 seconds
quick_response = joined.filter ("elapsed < 5")

total_count = joined.count()
quick_response_count = quick_response.count()
non_quick_response_count = total_count - quick_response_count
quick_response_percentage = quick_response_count * 100 / total_count

print ("total_count = {:,}".format (total_count))
print ("quick_response_count = {:,}".format (quick_response_count))
print ("non_quick_response_count = {:,}".format (non_quick_response_count))
print ("quick_response_percentage = {:,.2f} %".format (quick_response_percentage))

## Step-7 : Run this query on Hadoop Cluster

Run the same query, using data on HDFS

## Step-8: Discussions

So in this lab, we did large X large join.  And we can see it is expensive.

What are the techniques we can use to optimize the join? Please discuss with your class.