The notebook follows subham kharwal's examples from the repo https://github.com/subhamkharwal/ease-with-apache-spark

In [1]:
import pyspark
import warnings

warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession. \
    builder. \
    appName("creating spark"). \
    getOrCreate()

23/01/24 18:36:14 WARN Utils: Your hostname, codeStation resolves to a loopback address: 127.0.1.1; using 192.168.88.83 instead (on interface wlo1)
23/01/24 18:36:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/24 18:36:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df_range = spark.range(5)
df_range.show(truncate=False)

+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
+---+



In [7]:
df_range_se = spark.range(start=0,end=10,step=2,numPartitions=2)
df_range_se.show()

+---+
| id|
+---+
|  0|
|  2|
|  4|
|  6|
|  8|
+---+



In [8]:
# Creating native dataframe with schema
_data = [
    [1,"Spark"],
    [2,"Hive"],
    [3,"pig"],
    [4,"Sqoop"]
]

_cols = ["id","software"]

In [9]:
df_wares = spark.createDataFrame(data=_data,schema=_cols)
df_wares.show()

                                                                                

+---+--------+
| id|software|
+---+--------+
|  1|   Spark|
|  2|    Hive|
|  3|     pig|
|  4|   Sqoop|
+---+--------+



In [10]:
_data_rdd = spark.sparkContext.parallelize(_data)
_data_rdd.collect()

[[1, 'Spark'], [2, 'Hive'], [3, 'pig'], [4, 'Sqoop']]

In [11]:
type(_data_rdd)

pyspark.rdd.RDD

In [12]:
type(_data)

list

In [13]:
type(df_wares)

pyspark.sql.dataframe.DataFrame

In [15]:
_data_rdd.toDF(schema=_cols).show()

+---+--------+
| id|software|
+---+--------+
|  1|   Spark|
|  2|    Hive|
|  3|     pig|
|  4|   Sqoop|
+---+--------+



In [16]:
#parsing string data_types and creating schemas out of it...

from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.types import _parse_datatype_json_value
from pyspark.sql.types import _parse_datatype_string

In [17]:
_schema = 'id int, name string'
new_schema  = _parse_datatype_string(_schema)
new_schema

StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True)])

In [19]:
_komplex_schema = "id int, name map<string, string>, subject array<string>"
_komplex = _parse_datatype_string(_komplex_schema)
_komplex

StructType([StructField('id', IntegerType(), True), StructField('name', MapType(StringType(), StringType(), True), True), StructField('subject', ArrayType(StringType(), True), True)])

In [20]:
### How the complex schema will be used in practice

In [21]:
#Working with apis

import requests, json

def get_apiData(url: str):
    normalised_data = dict()
    data = requests.get(url).json()
    normalised_data['_data'] = data
    return json.dumps(normalised_data)

In [22]:
api_url = r"https://api.coindesk.com/v1/bpi/currentprice.json"

In [33]:
api_url_wzr = "https://api.wazirx.com/sapi/v1/tickers/24hr"

In [23]:
coindex = get_apiData(api_url)

In [34]:
wzr_dex = get_apiData(api_url_wzr)

In [25]:
dexData = json.loads(coindex)
dexData

{'_data': {'time': {'updated': 'Jan 24, 2023 13:50:00 UTC',
   'updatedISO': '2023-01-24T13:50:00+00:00',
   'updateduk': 'Jan 24, 2023 at 13:50 GMT'},
  'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org',
  'chartName': 'Bitcoin',
  'bpi': {'USD': {'code': 'USD',
    'symbol': '&#36;',
    'rate': '22,857.1066',
    'description': 'United States Dollar',
    'rate_float': 22857.1066},
   'GBP': {'code': 'GBP',
    'symbol': '&pound;',
    'rate': '19,099.2154',
    'description': 'British Pound Sterling',
    'rate_float': 19099.2154},
   'EUR': {'code': 'EUR',
    'symbol': '&euro;',
    'rate': '22,266.1590',
    'description': 'Euro',
    'rate_float': 22266.159}}}}

In [40]:
ext_wzr = json.loads(wzr_dex)
ext_wzr["_data"][0]

{'symbol': 'btcinr',
 'baseAsset': 'btc',
 'quoteAsset': 'inr',
 'openPrice': '1928806',
 'lowPrice': '1821009.0',
 'highPrice': '1944606.0',
 'lastPrice': '1924855.0',
 'volume': '21.30043',
 'bidPrice': '1924000.0',
 'askPrice': '1925723.0',
 'at': 1674569056000}

In [41]:
ext_rdd = spark.sparkContext.parallelize([ext_wzr["_data"]])

In [27]:
dexRdd = spark.sparkContext.parallelize([dexData])
dexRdd.collect()

[{'_data': {'time': {'updated': 'Jan 24, 2023 13:50:00 UTC',
    'updatedISO': '2023-01-24T13:50:00+00:00',
    'updateduk': 'Jan 24, 2023 at 13:50 GMT'},
   'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org',
   'chartName': 'Bitcoin',
   'bpi': {'USD': {'code': 'USD',
     'symbol': '&#36;',
     'rate': '22,857.1066',
     'description': 'United States Dollar',
     'rate_float': 22857.1066},
    'GBP': {'code': 'GBP',
     'symbol': '&pound;',
     'rate': '19,099.2154',
     'description': 'British Pound Sterling',
     'rate_float': 19099.2154},
    'EUR': {'code': 'EUR',
     'symbol': '&euro;',
     'rate': '22,266.1590',
     'description': 'Euro',
     'rate_float': 22266.159}}}}]

In [44]:
ext_rdd_df = spark.read.json(ext_rdd)
ext_rdd_df.printSchema()

root
 |-- askPrice: string (nullable = true)
 |-- at: long (nullable = true)
 |-- baseAsset: string (nullable = true)
 |-- bidPrice: string (nullable = true)
 |-- highPrice: string (nullable = true)
 |-- lastPrice: string (nullable = true)
 |-- lowPrice: string (nullable = true)
 |-- openPrice: string (nullable = true)
 |-- quoteAsset: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- volume: string (nullable = true)



In [31]:
spark_df = spark.read.json(dexRdd)
spark_df.select("_data").printSchema()

root
 |-- _data: struct (nullable = true)
 |    |-- bpi: struct (nullable = true)
 |    |    |-- EUR: struct (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- rate: string (nullable = true)
 |    |    |    |-- rate_float: double (nullable = true)
 |    |    |    |-- symbol: string (nullable = true)
 |    |    |-- GBP: struct (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- rate: string (nullable = true)
 |    |    |    |-- rate_float: double (nullable = true)
 |    |    |    |-- symbol: string (nullable = true)
 |    |    |-- USD: struct (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- rate: string (nullable = true)
 |    |    |    |-- rate_float: double (nullable = true)
 |    |    |    |-- symbol

In [50]:
spark_df.select("_data.*").show()

+--------------------+---------+--------------------+--------------------+
|                 bpi|chartName|          disclaimer|                time|
+--------------------+---------+--------------------+--------------------+
|{{EUR, Euro, 22,2...|  Bitcoin|This data was pro...|{Jan 24, 2023 13:...|
+--------------------+---------+--------------------+--------------------+



In [52]:
spark_df.select("_data.*").select("bpi.*").select("GBP.*").show()

+----+--------------------+-----------+----------+-------+
|code|         description|       rate|rate_float| symbol|
+----+--------------------+-----------+----------+-------+
| GBP|British Pound Ste...|19,099.2154|19099.2154|&pound;|
+----+--------------------+-----------+----------+-------+



In [47]:
ext_rdd_df.select("symbol","bidPrice","lowPrice").show()a

+--------+---------+---------+
|  symbol| bidPrice| lowPrice|
+--------+---------+---------+
|  btcinr|1924000.0|1821009.0|
|  xrpinr|    35.51|  35.1409|
|  ethinr| 135969.7| 135000.0|
|  trxinr|     5.25|      5.2|
|  eosinr|     89.1|    87.96|
|  zilinr|     2.37|      2.3|
|  batinr|   21.805|     21.0|
|  zrxinr|     19.9|     19.0|
|  reqinr|   9.2009|     9.11|
|  icxinr|   21.538|     16.5|
|  omginr|  120.105|  115.001|
| iostinr|     0.85|      0.8|
| dentinr|    0.076|    0.075|
|  hotinr|    0.178|    0.174|
| usdtinr|    84.75|    83.61|
|  wrxinr|     16.2|    15.56|
|maticinr|   84.401|   82.521|
|  bchinr|  11012.0|  10908.0|
|  bnbinr|  26516.0|  25409.0|
|  oneinr|   1.5751|     1.52|
+--------+---------+---------+
only showing top 20 rows



In [54]:
#example where the columns have json data

_jdata = [
    ['EMP001', '{"dept" : "account", "fname": "Ramesh", "lname": "Singh", "skills": ["excel", "tally", "word"]}'],
    ['EMP002', '{"dept" : "sales", "fname": "Siv", "lname": "Kumar", "skills": ["biking", "sales"]}'],
    ['EMP003', '{"dept" : "hr", "fname": "MS Raghvan", "skills": ["communication", "soft-skills"]}']
]


In [56]:
_jdata[0]

['EMP001',
 '{"dept" : "account", "fname": "Ramesh", "lname": "Singh", "skills": ["excel", "tally", "word"]}']

In [57]:
_colsJ = ['emp_no','raw_data']

df_jraw = spark.createDataFrame(_jdata,schema=_colsJ)

df_jraw.printSchema()

root
 |-- emp_no: string (nullable = true)
 |-- raw_data: string (nullable = true)



In [59]:
df_jraw.show(truncate=False)

+------+-----------------------------------------------------------------------------------------------+
|emp_no|raw_data                                                                                       |
+------+-----------------------------------------------------------------------------------------------+
|EMP001|{"dept" : "account", "fname": "Ramesh", "lname": "Singh", "skills": ["excel", "tally", "word"]}|
|EMP002|{"dept" : "sales", "fname": "Siv", "lname": "Kumar", "skills": ["biking", "sales"]}            |
|EMP003|{"dept" : "hr", "fname": "MS Raghvan", "skills": ["communication", "soft-skills"]}             |
+------+-----------------------------------------------------------------------------------------------+



In [64]:
#Using rdd to generate the schema 

json_schema_df = spark.read.json(df_jraw.rdd.map(lambda row: row.raw_data))
json_schema = json_schema_df.schema

                                                                                

In [62]:
json_schema_df.show(truncate=False)

+-------+----------+-----+----------------------------+
|dept   |fname     |lname|skills                      |
+-------+----------+-----+----------------------------+
|account|Ramesh    |Singh|[excel, tally, word]        |
|sales  |Siv       |Kumar|[biking, sales]             |
|hr     |MS Raghvan|null |[communication, soft-skills]|
+-------+----------+-----+----------------------------+



In [65]:
df_newj = df_jraw.withColumn("parsed_data",
                             from_json(df_jraw['raw_data'], 
                                       schema=json_schema))

In [66]:
df_newj.printSchema()

root
 |-- emp_no: string (nullable = true)
 |-- raw_data: string (nullable = true)
 |-- parsed_data: struct (nullable = true)
 |    |-- dept: string (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |    |-- skills: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [73]:
df_newj.select("emp_no","parsed_data.*"). \
        withColumn('Skills',explode("skills")). \
        drop("parsed_data"). \
        show(10, False)

+------+-------+----------+-----+-------------+
|emp_no|dept   |fname     |lname|Skills       |
+------+-------+----------+-----+-------------+
|EMP001|account|Ramesh    |Singh|excel        |
|EMP001|account|Ramesh    |Singh|tally        |
|EMP001|account|Ramesh    |Singh|word         |
|EMP002|sales  |Siv       |Kumar|biking       |
|EMP002|sales  |Siv       |Kumar|sales        |
|EMP003|hr     |MS Raghvan|null |communication|
|EMP003|hr     |MS Raghvan|null |soft-skills  |
+------+-------+----------+-----+-------------+



In [74]:
# Python function to flatten the data dynamically
from pyspark.sql import DataFrame

# Create outer method to return the flattened Data Frame
def flatten_json_df(_df: DataFrame) -> DataFrame:
    # List to hold the dynamically generated column names
    flattened_col_list = []
    
    # Inner method to iterate over Data Frame to generate the column list
    def get_flattened_cols(df: DataFrame, struct_col: str = None) -> None:
        for col in df.columns:
            if df.schema[col].dataType.typeName() != 'struct':
                if struct_col is None:
                    flattened_col_list.append(f"{col} as {col.replace('.','_')}")
                else:
                    t = struct_col + "." + col
                    flattened_col_list.append(f"{t} as {t.replace('.','_')}")
            else:
                chained_col = struct_col +"."+ col if struct_col is not None else col
                get_flattened_cols(df.select(col+".*"), chained_col)
    
    # Call the inner Method
    get_flattened_cols(_df)
    
    # Return the flattened Data Frame
    return _df.selectExpr(flattened_col_list)

In [76]:
flattened_df = flatten_json_df(df_newj)

In [77]:
flattened_df.printSchema()

root
 |-- emp_no: string (nullable = true)
 |-- raw_data: string (nullable = true)
 |-- parsed_data_dept: string (nullable = true)
 |-- parsed_data_fname: string (nullable = true)
 |-- parsed_data_lname: string (nullable = true)
 |-- parsed_data_skills: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [89]:
flattened_df.withColumn("skills",explode("parsed_data_skills")). \
        drop("parsed_data_skills","parsed_data_skills",
             "raw_data","parsed_data_dept"). \
        show(truncate=True)

+------+-----------------+-----------------+-------------+
|emp_no|parsed_data_fname|parsed_data_lname|       skills|
+------+-----------------+-----------------+-------------+
|EMP001|           Ramesh|            Singh|        excel|
|EMP001|           Ramesh|            Singh|        tally|
|EMP001|           Ramesh|            Singh|         word|
|EMP002|              Siv|            Kumar|       biking|
|EMP002|              Siv|            Kumar|        sales|
|EMP003|       MS Raghvan|             null|communication|
|EMP003|       MS Raghvan|             null|  soft-skills|
+------+-----------------+-----------------+-------------+



In [91]:
# merging dataframes
_data1 = [
    ["K101","newFrame",28,"5-05-2023"],
    ["K201","fineGram",79,"21-07-2021"],
    ["K103","urad",57,"02-07-2022"]
]

_col1 = ["ID","NAME","Age","DOB"]

df_1 = spark.createDataFrame(data=_data1,schema=_col1)
df_1.show()

+----+--------+---+----------+
|  ID|    NAME|Age|       DOB|
+----+--------+---+----------+
|K101|newFrame| 28| 5-05-2023|
|K201|fineGram| 79|21-07-2021|
|K103|    urad| 57|02-07-2022|
+----+--------+---+----------+



In [92]:
_data2 = [
    ["c101","Saku","Indore",["new Science", "physics"]],
    ["D105","Godu","Nokrum",["philo","Physics","Alchemy"]],
    ["Y056","Nokka","wakrund",["reunmod","rune sophy","magic"]]
] 

_col2 = ["ID","Name","Place","Subjects"]

In [93]:
df_2 = spark.createDataFrame(data=_data2,schema=_col2)
df_2.show()

+----+-----+-------+--------------------+
|  ID| Name|  Place|            Subjects|
+----+-----+-------+--------------------+
|c101| Saku| Indore|[new Science, phy...|
|D105| Godu| Nokrum|[philo, Physics, ...|
|Y056|Nokka|wakrund|[reunmod, rune so...|
+----+-----+-------+--------------------+



In [95]:
#Before merging the missing cols to be added

for col in df_2.columns:
    if col not in df_1.columns:
        df_1 = df_1.withColumn(col,lit(None))
        
for col in df_1.columns:
    if col not in df_2.columns:
        df_2 = df_2.withColumn(col,lit(None))
        
df_1.show()
df_2.show()

+----+----+---+----------+-----+--------+
|  ID|Name|Age|       DOB|Place|Subjects|
+----+----+---+----------+-----+--------+
|K101|null| 28| 5-05-2023| null|    null|
|K201|null| 79|21-07-2021| null|    null|
|K103|null| 57|02-07-2022| null|    null|
+----+----+---+----------+-----+--------+

+----+-----+-------+--------------------+----+----+
|  ID| Name|  Place|            Subjects| Age| DOB|
+----+-----+-------+--------------------+----+----+
|c101| Saku| Indore|[new Science, phy...|null|null|
|D105| Godu| Nokrum|[philo, Physics, ...|null|null|
|Y056|Nokka|wakrund|[reunmod, rune so...|null|null|
+----+-----+-------+--------------------+----+----+



In [96]:
dunion = df_1.unionByName(df_2)
dunion.show()

+----+-----+----+----------+-------+--------------------+
|  ID| Name| Age|       DOB|  Place|            Subjects|
+----+-----+----+----------+-------+--------------------+
|K101| null|  28| 5-05-2023|   null|                null|
|K201| null|  79|21-07-2021|   null|                null|
|K103| null|  57|02-07-2022|   null|                null|
|c101| Saku|null|      null| Indore|[new Science, phy...|
|D105| Godu|null|      null| Nokrum|[philo, Physics, ...|
|Y056|Nokka|null|      null|wakrund|[reunmod, rune so...|
+----+-----+----+----------+-------+--------------------+



In [97]:
_datafp = [
	["Ramesh", "PHY", 90],
	["Ramesh", "MATH", 95],
	["Ramesh", "CHEM", 100],
	["Sangeeta", "PHY", 90],
	["Sangeeta", "MATH", 100],
	["Sangeeta", "CHEM", 83],
	["Mohan", "BIO", 90],
	["Mohan", "MATH", 70],
	["Mohan", "CHEM", 76],
	["Imran", "PHY", 96],
	["Imran", "MATH", 87],
	["Imran", "CHEM", 79],
	["Imran", "BIO", 82]
]

_colsfp = ["NAME", "SUBJECT", "MARKS"]

# Generate Data Frame
df = spark.createDataFrame(data=_datafp, schema = _colsfp)
df.show(truncate = False)

+--------+-------+-----+
|NAME    |SUBJECT|MARKS|
+--------+-------+-----+
|Ramesh  |PHY    |90   |
|Ramesh  |MATH   |95   |
|Ramesh  |CHEM   |100  |
|Sangeeta|PHY    |90   |
|Sangeeta|MATH   |100  |
|Sangeeta|CHEM   |83   |
|Mohan   |BIO    |90   |
|Mohan   |MATH   |70   |
|Mohan   |CHEM   |76   |
|Imran   |PHY    |96   |
|Imran   |MATH   |87   |
|Imran   |CHEM   |79   |
|Imran   |BIO    |82   |
+--------+-------+-----+



In [98]:
# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out : https://www.geeksforgeeks.org/decorators-in-python/
import time

def get_time(func):
    def inner_get_time() -> str:
        start_time = time.time()
        func()
        end_time = time.time()
        return (f"Execution time: {(end_time - start_time)*1000} ms")
    print(inner_get_time())

In [100]:
@get_time

def x(): df.groupBy("NAME").pivot("SUBJECT").agg(sum("MARKS"))

Execution time: 822.1645355224609 ms


In [101]:
pivot_df_1 = df.groupBy("NAME").pivot("SUBJECT").agg(sum("MARKS"))
pivot_df_1.printSchema()
pivot_df_1.show(truncate = False)

root
 |-- NAME: string (nullable = true)
 |-- BIO: long (nullable = true)
 |-- CHEM: long (nullable = true)
 |-- MATH: long (nullable = true)
 |-- PHY: long (nullable = true)





+--------+----+----+----+----+
|NAME    |BIO |CHEM|MATH|PHY |
+--------+----+----+----+----+
|Mohan   |90  |76  |70  |null|
|Ramesh  |null|100 |95  |90  |
|Imran   |82  |79  |87  |96  |
|Sangeeta|null|83  |100 |90  |
+--------+----+----+----+----+



                                                                                

In [103]:
df.select("SUBJECT").distinct().show()

+-------+
|SUBJECT|
+-------+
|   MATH|
|    PHY|
|   CHEM|
|    BIO|
+-------+



In [104]:
@get_time
def x(): df.select("SUBJECT").distinct().rdd.map(lambda x: x[0]).collect()

Execution time: 610.4531288146973 ms


In [105]:
# Get the distinct list of Subjects
_subjects = df.select("SUBJECT").distinct().rdd.map(lambda x: x[0]).collect()
_subjects

['MATH', 'PHY', 'CHEM', 'BIO']

In [106]:
# Lets check the data and schema
pivot_df_2 = df.groupBy("NAME").pivot("SUBJECT", _subjects).agg(sum("MARKS"))
pivot_df_2.printSchema()
pivot_df_2.show(truncate = False)

root
 |-- NAME: string (nullable = true)
 |-- MATH: long (nullable = true)
 |-- PHY: long (nullable = true)
 |-- CHEM: long (nullable = true)
 |-- BIO: long (nullable = true)

+--------+----+----+----+----+
|NAME    |MATH|PHY |CHEM|BIO |
+--------+----+----+----+----+
|Mohan   |70  |null|76  |90  |
|Ramesh  |95  |90  |100 |null|
|Imran   |87  |96  |79  |82  |
|Sangeeta|100 |90  |83  |null|
+--------+----+----+----+----+



In [108]:
#Creating and registering udfs

_data = [
    [1, ["Bangalore", "Mumbai", "Pune", "Indore"]],
    [2, ["Bangalore"]],
    [3, []],
    [4, ["Kolkata", "Bhubaneshwar"]],
    [5, ["Bangalore", "Mumbai", "Pune", "Indore", "Ahmedabad", "Suratkal"]],
    [6, ["Delhi", "Mumbai", "Kolkāta", "Bangalore", "Chennai", "Hyderābād", "Pune", "Ahmedabad", "Sūrat", "Lucknow", "Jaipur", "Cawnpore", "Mirzāpur", "Nāgpur", "Ghāziābād", "Indore", "Vadodara", "Vishākhapatnam", "Bhopāl", "Chinchvad", "Patna", "Ludhiāna", "Āgra", "Kalyān", "Madurai", "Jamshedpur", "Nāsik", "Farīdābād", "Aurangābād", "Rājkot", "Meerut", "Jabalpur", "Thāne", "Dhanbād", "Allahābād", "Vārānasi", "Srīnagar", "Amritsar", "Alīgarh", "Bhiwandi", "Gwalior", "Bhilai", "Hāora", "Rānchi", "Bezwāda", "Chandīgarh", "Mysore", "Raipur", "Kota", "Bareilly", "Jodhpur", "Coimbatore", "Dispur", "Guwāhāti", "Solāpur", "Trichinopoly", "Hubli", "Jalandhar", "Bhubaneshwar", "Bhayandar", "Morādābād", "Kolhāpur", "Thiruvananthapuram", "Sahāranpur", "Warangal", "Salem", "Mālegaon", "Kochi", "Gorakhpur", "Shimoga", "Tiruppūr", "Guntūr", "Raurkela", "Mangalore", "Nānded", "Cuttack", "Chānda", "Dehra Dūn", "Durgāpur", "Āsansol", "Bhāvnagar", "Amrāvati", "Nellore", "Ajmer", "Tinnevelly", "Bīkaner", "Agartala", "Ujjain", "Jhānsi", "Ulhāsnagar", "Davangere", "Jammu", "Belgaum", "Gulbarga", "Jāmnagar", "Dhūlia", "Gaya", "Jalgaon", "Kurnool", "Udaipur", "Bellary", "Sāngli", "Tuticorin", "Calicut", "Akola", "Bhāgalpur", "Sīkar", "Tumkūr", "Quilon", "Muzaffarnagar", "Bhīlwāra", "Nizāmābād", "Bhātpāra", "Kākināda", "Parbhani", "Pānihāti", "Lātūr", "Rohtak", "Rājapālaiyam", "Ahmadnagar", "Cuddapah", "Rājahmundry", "Alwar", "Muzaffarpur", "Bilāspur", "Mathura", "Kāmārhāti", "Patiāla", "Saugor", "Bijāpur", "Brahmapur", "Shāhjānpur", "Trichūr", "Barddhamān", "Kulti", "Sambalpur", "Purnea", "Hisar", "Fīrozābād", "Bīdar", "Rāmpur", "Shiliguri", "Bāli", "Pānīpat", "Karīmnagar", "Bhuj", "Ichalkaranji", "Tirupati", "Hospet", "Āīzawl", "Sannai", "Bārāsat", "Ratlām", "Handwāra", "Drug", "Imphāl", "Anantapur", "Etāwah", "Rāichūr", "Ongole", "Bharatpur", "Begusarai", "Sonīpat", "Rāmgundam", "Hāpur", "Uluberiya", "Porbandar", "Pāli", "Vizianagaram", "Puducherry", "Karnāl", "Nāgercoil", "Tanjore", "Sambhal", "Naihāti", "Secunderābād", "Kharagpur", "Dindigul", "Shimla", "Ingrāj Bāzār", "Ellore", "Puri", "Haldia", "Nandyāl", "Bulandshahr", "Chakradharpur", "Bhiwāni", "Gurgaon", "Burhānpur", "Khammam", "Madhyamgram", "Ghāndīnagar", "Baharampur", "Mahbūbnagar", "Mahesāna", "Ādoni", "Rāiganj", "Bhusāval", "Bahraigh", "Shrīrāmpur", "Tonk", "Sirsa", "Jaunpur", "Madanapalle", "Hugli", "Vellore", "Alleppey", "Cuddalore", "Deo", "Chīrāla", "Machilīpatnam", "Medinīpur", "Bāramūla", "Chandannagar", "Fatehpur", "Udipi", "Tenāli", "Sitalpur", "Conjeeveram", "Proddatūr", "Navsāri", "Godhra", "Budaun", "Chittoor", "Harīpur", "Saharsa", "Vidisha", "Pathānkot", "Nalgonda", "Dibrugarh", "Bālurghāt", "Krishnanagar", "Fyzābād", "Silchar", "Shāntipur", "Hindupur"]]
]

_cols = ["id", "cities"]

# Create Data Frame
df = spark.createDataFrame(data = _data, schema = _cols)
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- cities: array (nullable = true)
 |    |-- element: string (containsNull = true)





+---+--------------------+
| id|              cities|
+---+--------------------+
|  1|[Bangalore, Mumba...|
|  2|         [Bangalore]|
|  3|                  []|
|  4|[Kolkata, Bhubane...|
|  5|[Bangalore, Mumba...|
|  6|[Delhi, Mumbai, K...|
+---+--------------------+



In [109]:
def len_of_cities(col):
    _len = 0
    for i in col:
        _len += len(i)
    return _len

#Above function is made into an udf using the lambda function

len_of_cities_udf = udf(lambda x: len_of_cities(x))

In [110]:
df.withColumn("udf_len", len_of_cities_udf("cities")).show()

                                                                                

+---+--------------------+-------+
| id|              cities|udf_len|
+---+--------------------+-------+
|  1|[Bangalore, Mumba...|     25|
|  2|         [Bangalore]|      9|
|  3|                  []|      0|
|  4|[Kolkata, Bhubane...|     19|
|  5|[Bangalore, Mumba...|     42|
|  6|[Delhi, Mumbai, K...|   1806|
+---+--------------------+-------+



In [111]:
df.withColumn("len_of_cities", aggregate("cities", lit(0), lambda x, y: x + length(y))).show()

+---+--------------------+-------------+
| id|              cities|len_of_cities|
+---+--------------------+-------------+
|  1|[Bangalore, Mumba...|           25|
|  2|         [Bangalore]|            9|
|  3|                  []|            0|
|  4|[Kolkata, Bhubane...|           19|
|  5|[Bangalore, Mumba...|           42|
|  6|[Delhi, Mumbai, K...|         1806|
+---+--------------------+-------------+



In [113]:
# Lets create the example dataset of fact and dimesion we would use for demonstration
# Python program to generate random Fact table data
# [1, ,"ORD1001", "D102", 56]
import random


def generate_fact_data(counter=100):
    fact_records = []
    dim_keys = ["D100", "D101", "D102", "D103", "D104"]
    order_ids = ["ORD" + str(i) for i in range(1001, 1010)]
    qty_range = [i for i in range(10, 120)]
    for i in range(counter):
        _record = [i, random.choice(order_ids), random.choice(dim_keys), random.choice(qty_range)]
        fact_records.append(_record)
    return fact_records

# We will generate 200 records with random data in Fact to create skewness
fact_records = generate_fact_data(200)

dim_records = [
    ["D100", "Product A"],
    ["D101", "Product B"],
    ["D102", "Product C"],
    ["D103", "Product D"],
    ["D104", "Product E"]
]

_fact_cols = ["id", "order_id", "prod_id", "qty"]
_dim_cols = ["prod_id", "prod_name"]

In [115]:
fact_df = spark.createDataFrame(data=fact_records,schema=_fact_cols)
fact_df.show(2)

+---+--------+-------+---+
| id|order_id|prod_id|qty|
+---+--------+-------+---+
|  0| ORD1003|   D103| 19|
|  1| ORD1002|   D104| 29|
+---+--------+-------+---+
only showing top 2 rows



In [116]:
dim_df = spark.createDataFrame(data=dim_records,schema=_dim_cols)
dim_df.show()

+-------+---------+
|prod_id|prod_name|
+-------+---------+
|   D100|Product A|
|   D101|Product B|
|   D102|Product C|
|   D103|Product D|
|   D104|Product E|
+-------+---------+



In [117]:
# Set Spark parameters - We have to turn off AQL to demonstrate Salting
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.shuffle.partitions", 5)

# Check the parameters
print(spark.conf.get("spark.sql.adaptive.enabled"))
print(spark.conf.get("spark.sql.shuffle.partitions"))

false
5


In [119]:
joinDF = fact_df.join(dim_df,on="prod_id",how="leftouter")
joinDF.show(10)



+-------+---+--------+---+---------+
|prod_id| id|order_id|qty|prod_name|
+-------+---+--------+---+---------+
|   D103|  0| ORD1003| 19|Product D|
|   D103|  2| ORD1007| 52|Product D|
|   D103|100| ORD1003| 44|Product D|
|   D103|151| ORD1004| 92|Product D|
|   D103|152| ORD1001|119|Product D|
|   D104|  1| ORD1002| 29|Product E|
|   D104| 59| ORD1009| 44|Product E|
|   D104|104| ORD1003| 38|Product E|
|   D104|158| ORD1002| 73|Product E|
|   D104|159| ORD1005| 80|Product E|
+-------+---+--------+---+---------+
only showing top 10 rows



In [125]:
partition_df = joinDF.withColumn("partition_num", 
                                    spark_partition_id())\
        .groupBy("partition_num").agg(count("id"))

partition_df.show()

+-------------+---------+
|partition_num|count(id)|
+-------------+---------+
|            4|      117|
|            2|       83|
+-------------+---------+



In [126]:
# UDF to return a random number every time
def rand(): return random.randint(0, 4) #Since we are distributing the data in 5 partitions
rand_udf = udf(rand)

# Salt Data Frame to add to dimension
salt_df = spark.range(0, 5)
salt_df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [127]:
salted_fact_df = fact_df.withColumn("salted_prod_id",
                                    concat("prod_id",lit("_"), 
                                           lit(rand_udf())))
salted_fact_df.show(10, False)

+---+--------+-------+---+--------------+
|id |order_id|prod_id|qty|salted_prod_id|
+---+--------+-------+---+--------------+
|0  |ORD1003 |D103   |19 |D103_4        |
|1  |ORD1002 |D104   |29 |D104_4        |
|2  |ORD1007 |D103   |52 |D103_0        |
|3  |ORD1001 |D101   |117|D101_0        |
|4  |ORD1009 |D101   |105|D101_4        |
|5  |ORD1004 |D101   |78 |D101_4        |
|6  |ORD1003 |D101   |106|D101_0        |
|7  |ORD1004 |D101   |56 |D101_3        |
|8  |ORD1002 |D102   |119|D102_1        |
|9  |ORD1008 |D102   |116|D102_0        |
+---+--------+-------+---+--------------+
only showing top 10 rows



In [132]:
salted_dim_df = dim_df.join(salt_df, how="cross")
salted_dim_df.show(2)

+-------+---------+---+
|prod_id|prod_name| id|
+-------+---------+---+
|   D100|Product A|  0|
|   D100|Product A|  1|
+-------+---------+---+
only showing top 2 rows



In [133]:
salted_dim_df = dim_df.join(salt_df, how="cross").withColumn("salted_prod_id", concat("prod_id", lit("_"), "id")).drop("id")

salted_dim_df.show(2)

+-------+---------+--------------+
|prod_id|prod_name|salted_prod_id|
+-------+---------+--------------+
|   D100|Product A|        D100_0|
|   D100|Product A|        D100_1|
+-------+---------+--------------+
only showing top 2 rows



In [134]:
salted_joined_df = salted_fact_df.join(salted_dim_df, on="salted_prod_id", how="leftouter")
salted_joined_df.show(10, False)

                                                                                

+--------------+---+--------+-------+---+-------+---------+
|salted_prod_id|id |order_id|prod_id|qty|prod_id|prod_name|
+--------------+---+--------+-------+---+-------+---------+
|D100_0        |107|ORD1004 |D100   |86 |D100   |Product A|
|D101_1        |5  |ORD1004 |D101   |78 |D101   |Product B|
|D101_1        |52 |ORD1005 |D101   |105|D101   |Product B|
|D101_1        |102|ORD1004 |D101   |82 |D101   |Product B|
|D103_1        |152|ORD1001 |D103   |119|D103   |Product D|
|D103_3        |2  |ORD1007 |D103   |52 |D103   |Product D|
|D103_3        |151|ORD1004 |D103   |92 |D103   |Product D|
|D104_0        |59 |ORD1009 |D104   |44 |D104   |Product E|
|D104_0        |159|ORD1005 |D104   |80 |D104   |Product E|
|D104_2        |104|ORD1003 |D104   |38 |D104   |Product E|
+--------------+---+--------+-------+---+-------+---------+
only showing top 10 rows



In [135]:
partition_df = salted_joined_df.withColumn("partition_num", spark_partition_id()).groupBy("partition_num") \
    .agg(count(lit(1)).alias("count")).orderBy("partition_num")

partition_df.show()



+-------------+-----+
|partition_num|count|
+-------------+-----+
|            0|   22|
|            1|   56|
|            2|   28|
|            3|   59|
|            4|   35|
+-------------+-----+



                                                                                

In [None]:
# Dataset read without specifying the schema

df_sales = spark \
    .read \
    .format("parquet") \
    .load("dataset/sales.parquet")

# Dataset read with schema
_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"

df_sales = spark \
    .read \
    .schema(_schema) \
    .format("parquet") \
    .load("dataset/sales.parquet")

df_sales = spark \
    .read \
    .schema(_required_schema) \
    .format("parquet") \
    .load("dataset/sales.parquet")

df_sales = spark \
    .read \
    .schema(_schema) \
    .parquet("dataset/sales.parquet") \
    .select("transacted_at", "trx_id", "amount")

df_sales = spark \
    .read \
    .schema(_schema) \
    .parquet("dataset/sales.parquet") \
    .drop("retailer_id", "description", "city_id")
