In [4]:
## Mounting google drive with the backend data
import os
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [6]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 55.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=61b7ff9b1bbce814635d72bbc5debc414cec3dac9f8e5194042ce4cc5ca1240c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [8]:
#Initializing Spark
import os
import sys
spark_home = '/usr/local/spark' # point to your spark home dir
os.environ['SPARK_home'] = spark_home
sys.path.insert(0, spark_home + "/python")
os.environ['PYSPARK_PYTHON'] = 'python3.5'


confList = [
    #["spark.streaming.unpersist", "true"]           # Automated memory management for Streaming apps
    ["spark.driver.cores","16"]
    ,["spark.driver.memory","56g"]
    ,["spark.driver.maxResultSize", "500g"]
    ,["spark.executor.memory","80g"]               # Amount of memory for each executor (shared by all tasks in the executor)
    ,["spark.mesos.coarse", "true"]                 # Accept and hold a fixed amount of resource from Mesos
    ,["spark.dynamicAllocation.enabled","true"]
    ,["spark.dynamicAllocation.initialExecutors","50"]
    ,["spark.dynamicAllocation.minExecutors","50"]
    ,["spark.storage.memoryMapThreshold","20g"]
#    ,["spark.serializer","org.apache.spark.serializer.KryoSerializer"] #Define Kryo as Base Serializer
#    ,["spark.kryoserializer.buffer","128k"]         #Define base memory per node
#    ,["spark.kryoserializer.buffer.max","2047"]  #Define max value that Kryo Serializer can run upto
    ,["spark.rdd.compress", "true"]                 # Enabling RDD compress to optimize storeage space
#    ,["spark.cores.max",32]                      # Defines the number of cores
    ,["spark.executor.heartbeatInterval", "10000s"]
    ,["spark.network.timeout", "11000s"]
    ,["spark.shuffle.io.connectionTimeout","10000s"]
    #,["spark.task.cpus", "4"]
    ,["spark.rpc.numRetries", "100"]
    ,["spark.task.maxFailures", "10"]
    ,["spark.executors.max","40"]
    ,["spark.memory.fraction","0.8"]
#    ,["spark.shuffle.io.connectionTimeout","120s"]
    ,["spark.shuffle.file.buffer","128k"]
    ,["spark.shuffle.compress","true"]
    ,["spark.shuffle.spill.compress","true"]
]
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
SPARK_CONF = SparkConf().setAppName("Argo_data").setAll(confList)
spark = SparkSession.builder    .config(conf=SPARK_CONF)     .enableHiveSupport().getOrCreate()
print("Spark session initialized")

spark.sql('set hive.exec.dynamic.partition.mode=nonstrict')
spark.sql('set    hive.tez.container.size=40240')
spark.sql('set    hive.tez.java.opts=-Xmx32000m')
spark.sql('set    hive.optimize.skewjoin=true')
spark.sql('set    hive.skewjoin.key=25000000')
spark.sql('set    hive.exec.reducers.max=10000')
spark.sql('set    hive.exec.reducers.bytes.per.reducer=128000000')
spark.sql('set    tez.runtime.shuffle.max.allowed.failed.fetch.fraction=0.95')
#spark.sql('set    tez.runtime.shuffle.failed.check.since-last.completion=false')
spark.sql('set    tez.runtime.shuffle.fetch.buffer.percent=0.1')
spark.sql('set    tez.runtime.shuffle.memory.limit.percent=0.25')
spark.sql('set    hive.exec.max.dynamic.partitions=500000')
spark.sql('set    hive.exec.max.dynamic.partitions.pernode=500000')

Spark session initialized


DataFrame[key: string, value: string]

**User History Data**

In [12]:
## Reading Backend Data
df=spark.read.csv('/content/drive/MyDrive/Backend Data CSVs/user_history_data.csv',header=True)

In [14]:
## Storing backend data into a temporary View
df.createOrReplaceTempView('user_history_data')

In [20]:
## Looking at the table
df.show()

+---+-------+--------------+--------------+---------------+--------------------+--------+--------------------+--------------------+--------------------+-----------+
| id|user_id|    first_name|     last_name|       username|               email|provider|       creation_date|       session_start|         session_end|start_depth|
+---+-------+--------------+--------------+---------------+--------------------+--------+--------------------+--------------------+--------------------+-----------+
|  1|   null|       Matthew|      Lowinger|          mlow |matthew.lowinger@...|   email|2019-08-16 12:28:...|2020-01-06 18:07:...|                null|       null|
|  2|   null|       Matthew|      Lowinger|          mlow |matthew.lowinger@...|   email|2019-08-16 12:28:...|2020-01-06 18:07:...|2020-01-06 18:07:...|       null|
|  3|   null|       Matthew|      Lowinger|          mlow |matthew.lowinger@...|   email|2019-08-16 12:28:...|2020-01-06 18:08:...|2020-01-06 18:08:...|       null|
|  4|   nu

In [18]:
## Checking for nulls in userid
spark.sql("""select count(*),sum(case when user_id is null then 1 else 0 end) from user_history_data""").show()

+--------+--------------------------------------------------+
|count(1)|sum(CASE WHEN (user_id IS NULL) THEN 1 ELSE 0 END)|
+--------+--------------------------------------------------+
| 1207441|                                              4198|
+--------+--------------------------------------------------+



In [21]:
4198/1207441

0.0034767744345272356

In [27]:
spark.sql("""select distinct creation_date from user_history_data where user_id=7501""").show()

+--------------------+
|       creation_date|
+--------------------+
|2020-08-30 12:48:...|
+--------------------+



In [35]:
dfp=df.toPandas()

In [39]:
dfp.describe()

Unnamed: 0,id,user_id,first_name,last_name,username,email,provider,creation_date,session_start,session_end,start_depth
count,1207441,1203243,1207441,1207441,1207441,1207441,1207441,1207441,1207441,690048,1036720.0
unique,1207441,42444,8847,23354,43266,42475,4,42544,1038093,660503,33436.0
top,1,54,John,Foulk,Argo,jeff@argonav.io,email,2019-05-17 15:46:27 UTC,2021-09-08 01:14:08 UTC,2021-09-09 15:11:38 UTC,0.0
freq,1,11396,22994,13129,11365,11396,473158,11396,34,25,732289.0


In [40]:
dfp.isnull().sum(axis=0)

id                    0
user_id            4198
first_name            0
last_name             0
username              0
email                 0
provider              0
creation_date         0
session_start         0
session_end      517393
start_depth      170721
dtype: int64

In [41]:
dfp.count()

id               1207441
user_id          1203243
first_name       1207441
last_name        1207441
username         1207441
email            1207441
provider         1207441
creation_date    1207441
session_start    1207441
session_end       690048
start_depth      1036720
dtype: int64

In [42]:
517393/1207441

0.42850375297840637

In [49]:
170721/1207441

0.1413907594656799

In [48]:
spark.sql("""select count(*) from user_history_data where start_depth is null""").show()

+--------+
|count(1)|
+--------+
|  170721|
+--------+



In [50]:
spark.sql("""select count(*) from user_history_data where start_depth=0""").show()

+--------+
|count(1)|
+--------+
|  811050|
+--------+



In [51]:
811050/1207441

0.6717098392385218

In [47]:
225278/1207441

0.18657474775164998

**User Data**

In [52]:
## Reading Backend Data
df=spark.read.csv('/content/drive/MyDrive/Backend Data CSVs/user_data.csv',header=True)

In [53]:
## Storing backend data into a temporary View
df.createOrReplaceTempView('user_data')

In [54]:
df.show()

+---+--------+--------------------+--------------------+--------------------+----------------------+---------------------+-------------------+-------------+--------------------+--------------------+------------------+---------------+--------------------+--------------------+--------------------+-----------------+----------+-----------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+
| id|provider|                 uid|  encrypted_password|reset_password_token|reset_password_sent_at|allow_password_change|remember_created_at|sign_in_count|  current_sign_in_at|     last_sign_in_at|current_sign_in_ip|last_sign_in_ip|  confirmation_token|        confirmed_at|confirmation_sent_at|unconfirmed_email|first_name|  last_name|         username|           

In [56]:
df.columns

['id',
 'provider',
 'uid',
 'encrypted_password',
 'reset_password_token',
 'reset_password_sent_at',
 'allow_password_change',
 'remember_created_at',
 'sign_in_count',
 'current_sign_in_at',
 'last_sign_in_at',
 'current_sign_in_ip',
 'last_sign_in_ip',
 'confirmation_token',
 'confirmed_at',
 'confirmation_sent_at',
 'unconfirmed_email',
 'first_name',
 'last_name',
 'username',
 'image',
 'email',
 'tokens',
 'created_at',
 'updated_at',
 'push_token',
 'lat',
 'lng',
 'heading',
 'depth',
 'blocked',
 'deleted_at',
 'reset_password_token_base']