In [1]:
import findspark
findspark.init("/opt/manual/spark")

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

In [2]:
spark = (
    SparkSession.builder
    .appName("Spark Bucketing")
    .master("yarn")
    .enableHiveSupport()
    .getOrCreate())

# read market1mil

In [3]:
market1mil = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.load("/user/train/datasets/market1mil.csv.gz") \
.orderBy(F.rand())

In [21]:
market1mil = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .option("compression", "gzip") \
    .load("/user/train/datasets/market1mil.csv.gz") \
    .orderBy(F.rand()).withColumn("LOGICALREF", F.col("LOGICALREF").cast(IntegerType()))

In [22]:
market1mil.limit(3).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,835552,1,5736,CILEK,211098,1.05.2017 00:00,65,45,293,271,...,Rojin RENÇBER,A25,HAL,MEYVE SEBZE,MEYVE,,2.05.2017 19:24,2.05.2017 19:24,K,14.07.2018 01:48
1,544158,1,1935,DR.OETKER JOLE 100 GR CILEK,142233,23.03.2017 00:00,1,21,21,194,...,Şilan TARIKOĞULLARI,40,DR.OETKER,GIDA,TOZ TATLI,,24.03.2017 09:57,24.03.2017 09:58,K,14.07.2018 01:56
2,952905,1,5692,KARPUZ,238847,17.05.2017 00:00,5945,125,743,688,...,Alperen ELİBOZ,A25,HAL,MEYVE SEBZE,MEYVE,,18.05.2017 12:39,18.05.2017 12:39,E,14.07.2018 02:14


In [23]:
market1mil.printSchema()

root
 |-- LOGICALREF: integer (nullable = true)
 |-- COUNT_: integer (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- FICHENO: integer (nullable = true)
 |-- DATE_: string (nullable = true)
 |-- AMOUNT: string (nullable = true)
 |-- PRICE: string (nullable = true)
 |-- LINENETTOTAL: string (nullable = true)
 |-- LINENET: string (nullable = true)
 |-- BRANCHNR: integer (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- SALESMAN: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- CLIENTCODE: string (nullable = true)
 |-- CLIENTNAME: string (nullable = true)
 |-- BRANDCODE: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- CATEGORY_NAME1: string (nullable = true)
 |-- CATEGORY_NAME2: string (nullable = true)
 |-- CATEGORY_NAME3: string (nullable = true)
 |-- STARTDATE: string 

In [10]:
market5mil = spark.read.format("parquet") \
.load("/user/train/datasets/market5mil_parquet") \
.orderBy(F.rand())

In [13]:
market5mil.limit(2).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,1900752,1,6981,ETI CIN TEK LOKMALIK PORTAKALLI 112 GR,457893,2017-08-22,1,1.0,1.0,0.93,...,Zeliha ERÇİŞ,44,ETİ,GIDA,BÜSKİVİ ÇEREZ,BÜSKİVİ,2017-08-23 14:33:00,2017-08-23 14:34:19,K,2018-07-14 01:58:56
1,4670731,1,21300,H.SAKIR SAB.BEYAZ 4LU DOGAL,627649,2018-12-17,1,3.45,3.45,2.93,...,Toprak KURSUN,58,H.ŞAKİR,KOZMETİK,DUŞ BANYO,KATI SABUNLAR,2018-12-18 14:36:52,2018-12-18 14:37:14,E,2018-07-14 02:01:48


In [12]:
market5mil.printSchema()

root
 |-- LOGICALREF: integer (nullable = true)
 |-- COUNT_: integer (nullable = true)
 |-- ITEMCODE: string (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- FICHENO: string (nullable = true)
 |-- DATE_: timestamp (nullable = true)
 |-- AMOUNT: integer (nullable = true)
 |-- PRICE: float (nullable = true)
 |-- LINENETTOTAL: float (nullable = true)
 |-- LINENET: float (nullable = true)
 |-- BRANCHNR: string (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- SALESMAN: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- CLIENTCODE: string (nullable = true)
 |-- CLIENTNAME: string (nullable = true)
 |-- BRANDCODE: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- CATEGORY_NAME1: string (nullable = true)
 |-- CATEGORY_NAME2: string (nullable = true)
 |-- CATEGORY_NAME3: string (nullable = true)
 |-- STARTDATE: timestamp (

In [24]:
spark.conf.get("spark.sql.sources.bucketing.enabled")

'true'

# Join

In [27]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [29]:
joined_df = market1mil.join(market5mil, "LOGICALREF") \
.drop(*market1mil.columns)

In [30]:
import time 
start_time = time.time()

joined_df.limit(5).toPandas()

print("--- %s seconds ---" %(time.time()- start_time))

--- 17.546129941940308 seconds ---


In [31]:
joined_df.explain()

== Physical Plan ==
*(5) Project
+- *(5) SortMergeJoin [LOGICALREF#371], [LOGICALREF#141], Inner
   :- *(2) Sort [LOGICALREF#371 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(LOGICALREF#371, 200), true, [id=#296]
   :     +- *(1) Project [cast(LOGICALREF#314 as int) AS LOGICALREF#371]
   :        +- *(1) Filter isnotnull(cast(LOGICALREF#314 as int))
   :           +- FileScan csv [LOGICALREF#314] Batched: false, DataFilters: [isnotnull(cast(LOGICALREF#314 as int))], Format: CSV, Location: InMemoryFileIndex[hdfs://localhost:9000/user/train/datasets/market1mil.csv.gz], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<LOGICALREF:string>
   +- *(4) Sort [LOGICALREF#141 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(LOGICALREF#141, 200), true, [id=#306]
         +- *(3) Project [LOGICALREF#141]
            +- *(3) Filter isnotnull(LOGICALREF#141)
               +- *(3) ColumnarToRow
                  +- FileScan parquet [LOGICALREF#141] Batched: tru

# write hive with buckets

In [36]:
import time 
start_time = time.time()

market1mil.orderBy(F.asc("LOGICALREF")) \
.write.format("parquet") \
.mode("overwrite") \
.bucketBy(8, "LOGICALREF") \
.saveAsTable("market1mil_tbl")

print("--- %s seconds ---" %(time.time()- start_time))

--- 286.9814684391022 seconds ---


In [37]:
import time 
start_time = time.time()

market5mil.orderBy(F.asc("LOGICALREF")) \
.write.format("parquet") \
.mode("overwrite") \
.bucketBy(8, "LOGICALREF") \
.saveAsTable("market5mil_tbl")

print("--- %s seconds ---" %(time.time()- start_time))

--- 415.4062271118164 seconds ---


In [38]:
spark.sql("show tables").show()

+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
| default|    advertising|      false|
| default| market1mil_tbl|      false|
| default| market5mil_tbl|      false|
| default|order_items_tbl|      false|
| default|     orders_tbl|      false|
+--------+---------------+-----------+



In [39]:
market1mil_tbl = spark.sql("select * from market1mil_tbl")

In [42]:
market1mil_tbl.limit(5).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,12,1,13519,FILIZ MAKARNA KISA KESME 500 GR,15561,2.01.2017 00:00,1,11,11,102,...,Nurgül ZÜLFİKAR,52,FİLİZ,GIDA,MAKARNA,MAKARNA,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 02:24
1,13,1,8639,BINGO ULTRA CAM.SUYU 750 ML KLASIK,15561,2.01.2017 00:00,1,245,245,208,...,Nurgül ZÜLFİKAR,224,BİNGO,DETERJAN TEMİZLİK,ÇAMAŞIR YIKAMA,ÇAMAŞIR SULARI,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 02:24
2,14,1,6372,DERYA KASAR 250GR,15561,2.01.2017 00:00,1,49,49,454,...,Nurgül ZÜLFİKAR,153,DERYA,SÜT KAHVALTILIK,PEYNİR,KAŞAR PEYNİRİ,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 02:24
3,18,1,5461,KIVIRCIK,15563,2.01.2017 00:00,1,125,125,116,...,Ege YOSUNLUKAYA,A25,HAL,MEYVE SEBZE,SEBZE,,3.01.2017 09:35,3.01.2017 09:36,E,14.07.2018 02:17
4,38,1,5362,SİHİRLİ ELLER CİG KÖFTE 200GR,15572,2.01.2017 00:00,2,44,88,815,...,Hasan ÖZYOL,346,SİHİRLİ ELLER,GIDA,HAZIR YEMEKLER,MEZE,3.01.2017 09:56,3.01.2017 09:56,E,14.07.2018 01:42


In [43]:
market5mil_tbl = spark.sql("select * from market5mil_tbl")

In [44]:
market5mil_tbl.limit(5).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,12,1,13519,FILIZ MAKARNA KISA KESME 500 GR,15561,2017-01-02,1,1.1,1.1,1.02,...,Nurgül ZÜLFİKAR,52,FİLİZ,GIDA,MAKARNA,MAKARNA,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 02:24:47
1,13,1,8639,BINGO ULTRA CAM.SUYU 750 ML KLASIK,15561,2017-01-02,1,2.45,2.45,2.08,...,Nurgül ZÜLFİKAR,224,BİNGO,DETERJAN TEMİZLİK,ÇAMAŞIR YIKAMA,ÇAMAŞIR SULARI,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 02:24:47
2,14,1,6372,DERYA KASAR 250GR,15561,2017-01-02,1,4.9,4.9,4.54,...,Nurgül ZÜLFİKAR,153,DERYA,SÜT KAHVALTILIK,PEYNİR,KAŞAR PEYNİRİ,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 02:24:47
3,18,1,5461,KIVIRCIK,15563,2017-01-02,1,1.25,1.25,1.16,...,Ege YOSUNLUKAYA,A25,HAL,MEYVE SEBZE,SEBZE,,2017-01-03 09:35:35,2017-01-03 09:36:10,E,2018-07-14 02:17:23
4,38,1,5362,SİHİRLİ ELLER CİG KÖFTE 200GR,15572,2017-01-02,2,4.4,8.8,8.15,...,Hasan ÖZYOL,346,SİHİRLİ ELLER,GIDA,HAZIR YEMEKLER,MEZE,2017-01-03 09:56:48,2017-01-03 09:56:56,E,2018-07-14 01:42:33


In [45]:
joined_df2 = market1mil_tbl.join(market5mil_tbl,"LOGICALREF") \
.drop(*market1mil_tbl.columns)

In [46]:
import time 
start_time = time.time()

joined_df2.limit(5).toPandas()

print("--- %s seconds ---" %(time.time()- start_time))

--- 9.446430683135986 seconds ---


In [47]:
spark.stop()