In [1]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None
pd.options.display.max_colwidth = 80
#('display.max_colwidth', 80)


# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")

In [2]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
#  %reload_ext autoreload


# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)


from shared.app_context import *
from builder.DataFrameBuild import *

ctx = ApplicationContext("Dev-Job")

DFB = DataFrameBuild(ctx.spark)

print("%16s  %s" % ("Python Version:",sys.version))
print("%16s  %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s  %s" % ("My Python Libs:",my_library))
print("%16s  %s" % ("My Spark Dir:",my_spark))
print("%16s  %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")

 Python Version:  3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:25:24) [MSC v.1900 64 bit (AMD64)]
    Python Path:  C:\Users\d810216\AppData\Local\conda\conda\envs\my_root
 My Python Libs:  C:\Users\d810216/.myconfigs
   My Spark Dir:  C:\Users\d810216/spark2_dfanalysis
   My Spark Ctx:  <pyspark.sql.session.SparkSession object at 0x0000026049070518>


### Check partitioning

In [3]:
# Count Partition Elements:
def count_in_a_partition(idx, iterator):
    count = 0
    for _ in iterator:
        count += 1
    return idx, count

# Number of partitions!
def df_partition_count(df):
    print("Number of Partitions:",df.rdd.getNumPartitions())
    print("count: ",df.count())
    lst_parts_count = df.rdd.mapPartitionsWithIndex(count_in_a_partition).collect()
    # ex: [0,256,1,255,2,256, ..] with 0, 1, and 2 having ~255-256 records
    # print(type(list))
    arr_x = np.array(lst_parts_count)
    x2 = arr_x.shape[0]/2
    # print(arr_x.shape,x2)
    arr_x2 = np.reshape(arr_x,(int(x2),2))
    # print("Number of partitions as 2D array:",arr_x2.shape)
    # print(arr_x2)
    return arr_x2

## Here's our sample dataframe:

#### df1: has 25000 cars sold, with a division, month, and price listed
#### df2: has the 25000 cars, with just the make

In [4]:
num = 25000

df1 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
     DFB.build_array("string",num=num,width=3),
     DFB.build_array("integer",num=num,nrange=(100,999)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("integer",num=num,nrange=(1,28)),
     DFB.build_array("integer",num=num,nrange=(2010,2019)),
     DFB.build_array("integer",num=num,nrange=(0,23)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("double",num=num,nrange=(1000,100000))],
    ['index','vin','division','month','day','year','hour','minute','second','price'])


lst_cars = [random.choice(['Honda','Toyota','Chevy','Ford','Tesla','Volkswagon','Hyundai','Jeep']) for x in range(num)]


df2 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
    lst_cars],
    ['index','make'])

### join!

In [5]:
%%time
df_sales = df1.alias("a")\
.join(df2.alias("b"),col("a.index") == col("b.index"),"inner")\
.drop(col("b.index"))

Wall time: 182 ms


In [6]:
%%time
print(df_sales.count())

25000
Wall time: 30.7 s


In [7]:
%%time
df1b = df1.coalesce(1)
df2b = df2.coalesce(1)

df_sales2 = df1b.alias("a")\
.join(df2b.alias("b"),col("a.index") == col("b.index"),"inner")\
.drop(col("b.index"))

df_sales2.count()

Wall time: 10.7 s


In [8]:
arr1 = df_partition_count(df1)
print(arr1)

Number of Partitions: 4
count:  25000
[[   0 6144]
 [   1 6144]
 [   2 6144]
 [   3 6568]]


In [9]:
df_partition_count(df2)

Number of Partitions: 4
count:  25000


array([[   0, 6144],
       [   1, 6144],
       [   2, 6144],
       [   3, 6568]])

In [10]:
df_partition_count(df_sales)

Number of Partitions: 200
count:  25000


array([[  0, 112],
       [  1, 109],
       [  2, 117],
       [  3, 110],
       [  4, 130],
       [  5, 136],
       [  6, 132],
       [  7, 123],
       [  8, 112],
       [  9, 131],
       [ 10, 118],
       [ 11, 133],
       [ 12, 130],
       [ 13, 136],
       [ 14, 135],
       [ 15, 129],
       [ 16, 118],
       [ 17, 118],
       [ 18, 116],
       [ 19, 128],
       [ 20, 130],
       [ 21, 124],
       [ 22, 109],
       [ 23, 129],
       [ 24, 121],
       [ 25, 129],
       [ 26, 107],
       [ 27, 128],
       [ 28, 132],
       [ 29, 126],
       [ 30, 112],
       [ 31, 130],
       [ 32, 139],
       [ 33, 128],
       [ 34, 104],
       [ 35, 140],
       [ 36, 115],
       [ 37, 123],
       [ 38, 126],
       [ 39, 130],
       [ 40, 118],
       [ 41, 126],
       [ 42, 114],
       [ 43, 121],
       [ 44, 128],
       [ 45, 126],
       [ 46, 108],
       [ 47, 131],
       [ 48, 142],
       [ 49, 135],
       [ 50, 140],
       [ 51, 105],
       [ 52,

In [11]:
df_partition_count(df1b)

Number of Partitions: 1
count:  25000


array([[    0, 25000]])

In [12]:
df_partition_count(df2b)

Number of Partitions: 1
count:  25000


array([[    0, 25000]])

In [13]:
df_partition_count(df_sales2)

Number of Partitions: 1
count:  25000


array([[    0, 25000]])

### small and unconvincing improvement, nevertheless:
Join Partitioned Dataframes:
- 1 part  to 1 part  -> 1   partition
- 4 parts to 4 parts -> 200 partitions

At 25000:
- (join, count) vs (coalesce, join, count): ~ 20-30 vs 9 sec
- basically, count on ~200 partitions vs ~10;  ?faster to count on fewer partitions

At 100000:
- same!  20-30s vs. 9s

In [14]:
df_sales.limit(8).toPandas()

Unnamed: 0,index,vin,division,month,day,year,hour,minute,second,price,make
0,26,nic,107,9,20,2010,2,53,35,10376.0,Tesla
1,29,yvq,670,10,11,2012,17,28,27,34556.88,Volkswagon
2,474,oem,134,10,14,2015,4,32,8,67254.91,Tesla
3,964,phd,372,2,3,2010,22,33,5,12598.23,Tesla
4,1677,zxp,925,3,6,2011,2,49,55,72063.22,Ford
5,1697,xad,184,7,15,2010,7,39,11,21221.2,Chevy
6,1806,mcq,529,5,3,2019,4,14,13,73160.28,Toyota
7,1950,gfw,168,7,23,2010,3,41,20,52730.97,Honda


In [15]:
df_sales2.limit(8).toPandas()

Unnamed: 0,index,vin,division,month,day,year,hour,minute,second,price,make
0,1,xqo,500,4,10,2018,17,6,9,35072.31,Volkswagon
1,2,zfz,527,8,14,2019,20,54,42,81569.03,Jeep
2,3,cmc,999,2,15,2012,14,43,22,70133.12,Hyundai
3,4,mxf,224,7,26,2015,6,25,40,64560.72,Jeep
4,5,sck,100,11,1,2016,20,10,17,34551.97,Jeep
5,6,usp,995,5,25,2010,19,9,32,22132.98,Ford
6,7,bho,408,8,10,2016,23,53,54,52308.26,Tesla
7,8,jyv,743,3,16,2014,12,50,56,81296.68,Toyota


### Create a unique identifier column (probably available elsewhere)

In [16]:
unique_id = [col('make'),lit('_'),col('vin'), lit('_'), col('year')]

df_id = df_sales.withColumn('key',concat(*unique_id))

In [17]:
df_id.limit(8).toPandas()

Unnamed: 0,index,vin,division,month,day,year,hour,minute,second,price,make,key
0,26,nic,107,9,20,2010,2,53,35,10376.0,Tesla,Tesla_nic_2010
1,29,yvq,670,10,11,2012,17,28,27,34556.88,Volkswagon,Volkswagon_yvq_2012
2,474,oem,134,10,14,2015,4,32,8,67254.91,Tesla,Tesla_oem_2015
3,964,phd,372,2,3,2010,22,33,5,12598.23,Tesla,Tesla_phd_2010
4,1677,zxp,925,3,6,2011,2,49,55,72063.22,Ford,Ford_zxp_2011
5,1697,xad,184,7,15,2010,7,39,11,21221.2,Chevy,Chevy_xad_2010
6,1806,mcq,529,5,3,2019,4,14,13,73160.28,Toyota,Toyota_mcq_2019
7,1950,gfw,168,7,23,2010,3,41,20,52730.97,Honda,Honda_gfw_2010
