<h1><center>Introduction to Google Colab and PySpark</center></h1>

In [4]:
! pip install pycryptodome
! pip install pyspark

Collecting pycryptodome
  Downloading pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Downloading pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycryptodome
Successfully installed pycryptodome-3.20.0
Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=35edfc99c134a8f7cfa2014c50b475534c674428a0b9cb9ae2ab6f3fd58f1725
  Stored in directory: /root/.cache/pip/wheel

In [5]:
from typing import Optional, List, Dict, Any, Tuple
from pyspark.sql import SparkSession
from Crypto.Cipher import AES
from io import BytesIO
import secrets
#import pandas as pd
import collections
import binascii
import datetime
import random
import string
import uuid
import json
import time
import os


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("CreateDataFrameFunction").getOrCreate()


In [None]:
"""FUNCTION DEFINITIONS"""
# Using 16bit zeroes as IV for the AES algo

IV = binascii.unhexlify("00000000000000000000000000000000")


def aes_128_cbc_encrypt(key, text):
    """AES encryption function

    Args:
        key (str): secret key
        text (str): text to encrypt

    Returns:
        str: encrypted text
    """
    keyb = binascii.unhexlify(key)
    textb = binascii.unhexlify(text)

    # ===========================#
    # ==========E[KI]K4==========#
    # ===========================#

    encryptor = AES.new(keyb, AES.MODE_CBC, IV=IV)
    ciphertext = encryptor.encrypt(textb)
    return ciphertext.hex().upper()


def gen_ki():
    """Generate random key based on uuid4 standard Universally Unique Identifier (UUID)
    Returns:
        str: random key of 128 bit in hex form
    """

    """Changed to secrets reason test report for uuid is not good as compared to secrets By Hamza """
    return str(secrets.token_hex(16)).upper()

def gen_op():
    """Generate random key based on uuid4 standard Universally Unique Identifier (UUID)
    Returns:
        str: random key of 128 bit in hex form
    """

    """Changed to secrets reason test report for uuid is not good as compared to secrets By Hamza """
#    return str(secrets.token_hex(16)).upper()
    ret = "59EC8D16FB1724573DE8E6E33A660EEE"
    return ret


def gen_k4():
    """Generate Transport key

    Returns:
        str: Transport key of size 256 bit
    """
#    return str(secrets.token_hex(32)).upper()
    ret = "07E751ADDA7E4D9B284A6FCB7F9315FDA1F2D7A9B2BCA8D264652218AF61DEC5"
    return ret




def gen_opc(op, ki):
    return calc_opc_hex(ki, op).upper()


def xor_str(s, t):
    return bytes([_a ^ _b for _a, _b in zip(s, t)])


def calc_opc_hex(k_hex: str, op_hex: str) -> str:
    """Calculation of OPc from ki(Secret key) and OP ( Operator Variant Algorithm Configuration Field OP)
    Reference:  TS 135 206 - V14.0.0 Standard

    Args:
        k_hex (str): Secret key
        op_hex (str): OP key

    Returns:
        str: OPc
    """

    iv = binascii.unhexlify(16 * "00")
    ki = binascii.unhexlify(k_hex)
    op = binascii.unhexlify(op_hex)
    # ===========================#
    # =========E[OP]k============#
    # ===========================#

    aes_crypt = AES.new(ki, mode=AES.MODE_CBC, IV=iv)

    data = op
    # ===========================#
    # ========E[OP]k^OP==========#
    # ===========================#
    o_pc = xor_str(data, aes_crypt.encrypt(data))
    return o_pc.hex().upper()


def gen_eki(transport, ki):
    return aes_128_cbc_encrypt(transport, ki)


def gen_opc_eki(op, transport, ki):
    return {"opc": gen_opc(op, ki), "eki": gen_eki(transport, ki)}


def lenfunc(x):
    return len(x)


def generate_8_Digit():
    return str(secrets.SystemRandom().randint(10000000, 99999999))


# def generate_16_Digit():
#     return str(secrets.SystemRandom().randint(1000000000000000, 9999999999999999))


def generate_4_Digit():
    return str(secrets.SystemRandom().randint(1000, 9999))


def enc_pin_func(x):
    return rpad(s2h(x), 16).upper()


def dec_pin_func(x):
    return (h2s(x)).upper()


def enc_imsi(imsi):
    imsi = str(imsi)
    l = half_round_up(
        len(imsi) + 1
    )  # Required bytes - include space for odd/even indicator
    oe = len(imsi) & 1  # Odd (1) / Even (0)
    ei = "%02x" % l + swap_nibbles("%01x%s" % ((oe << 3) | 1, rpad(imsi, 15)))
    return ei


def dec_imsi(ef):
    ef = str(ef)
    if len(ef) < 4:
        return None
    l = int(ef[0:2], 16) * 2  # Length of the IMSI string
    l = l - 1  # Encoded length byte includes oe nibble
    swapped = swap_nibbles(ef[2:]).rstrip("f")
    if len(swapped) < 1:
        return None
    oe = (int(swapped[0]) >> 3) & 1  # Odd (1) / Even (0)
    if not oe:
        l = l - 1
    if l != len(swapped) - 1:
        return None
    imsi = swapped[1:]
    return imsi


Hexstr = str


def enc_iccid(iccid: str) -> Hexstr:
    iccid = str(iccid)
    luhn = calculate_luhn(iccid)
    iccid = iccid + str(luhn)
    m_iccid = swap_nibbles(rpad(iccid, 20))
    return m_iccid.upper()


def dec_iccid(ef: Hexstr) -> str:
    ef = str(ef)
    ef = ef.upper()
    iccid = swap_nibbles(ef).strip("F")
    return iccid[:-1]


def swap_nibbles(s: Hexstr) -> Hexstr:
    return "".join([x + y for x, y in zip(s[1::2], s[0::2])])


def rpad(s: str, l: int, c="f") -> str:
    return s + c * (l - len(s))


def lpad(s: str, l: int, c="f") -> str:
    return c * (l - len(s)) + s


def half_round_up(n: int) -> int:
    return (n + 1) // 2


def calculate_luhn(cc) -> int:
    num = list(map(int, str(cc)))
    check_digit = (
        10 - sum(num[-2::-2] + [sum(divmod(d * 2, 10)) for d in num[::-2]]) % 10
    )
    return 0 if check_digit == 10 else check_digit


def h2b(s: Hexstr) -> bytearray:
    return bytearray.fromhex(s)


def b2h(b: bytearray) -> Hexstr:
    return "".join(["%02x" % (x) for x in b])


def h2i(s: Hexstr) -> List[int]:
    return [(int(x, 16) << 4) + int(y, 16) for x, y in zip(s[0::2], s[1::2])]


def i2h(s: List[int]) -> Hexstr:
    return "".join(["%02x" % (x) for x in s])


def h2s(s: Hexstr) -> str:
    return "".join(
        [
            chr((int(x, 16) << 4) + int(y, 16))
            for x, y in zip(s[0::2], s[1::2])
            if int(x + y, 16) != 0xFF
        ]
    )


def s2h(s: str) -> Hexstr:
    b = bytearray()
    b.extend(map(ord, s))
    return b2h(b)


def i2s(s: List[int]) -> str:
    return "".join([chr(x) for x in s])


def integer_2_ascii(x):
    return s2h(x)


def ascii_2_integer(x):
    return h2s(str(x))


def apply_luhn_check(x):
    return str(x) + str(calculate_luhn(str(x)))


def copy_function(x):
    return str(x)


def ACC_function(imsi: str) -> str:
  last_digit = int(imsi[-1])
  acc_binary = bin(1 << last_digit)[2:].zfill(16)
  return format(int(acc_binary, 2), "04x")



In [None]:

# Example usage
imsi_value = 410011112222000
iccid_value = 89012345678901000000
num_rows = 5

#def create_dataframe(imsi, iccid, n):
    # Initialize SparkSessio

    # Define schema
schema = StructType([
        StructField("IMSI", StringType(), False, ),
        StructField("ICCID", StringType(), False),
#        StructField("PIN1", StringType(), False),
#        StructField("PUK1", StringType(), False),
#        StructField("PIN2", StringType(), False),
#        StructField("PUK2", StringType(), False),
#        StructField("KI", StringType(), False),
#        StructField("EKI", StringType(), False),
#        StructField("OPC", StringType(), False),
#        StructField("ACC", StringType(), False),

    ])

    # Create data
#data = [(lambda x: imsi_value+1 , iccid_value, generate_4_Digit(), generate_8_Digit(), generate_4_Digit(), generate_8_Digit(),  gen_ki() ) for _ in range(num_rows)]
data = [( str(imsi_value + i), str(iccid_value + i)) for i in range(num_rows)]

    # Create DataFrame
df = spark.createDataFrame(data, schema)

    # Show the DataFrame
df.printSchema()
df.show()


#df = create_dataframe(imsi_value, iccid_value, num_rows)


root
 |-- IMSI: string (nullable = false)
 |-- ICCID: string (nullable = false)

+---------------+--------------------+
|           IMSI|               ICCID|
+---------------+--------------------+
|410011112222000|89012345678901000000|
|410011112222001|89012345678901000001|
|410011112222002|89012345678901000002|
|410011112222003|89012345678901000003|
|410011112222004|89012345678901000004|
+---------------+--------------------+



In [2]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, upper, udf

# Register the function as a UDF
ki_udf = udf(gen_ki, StringType())
pin1_udf = udf(generate_4_Digit, StringType())
pin2_udf = udf(generate_4_Digit, StringType())
puk1_udf = udf(generate_8_Digit, StringType())
puk2_udf = udf(generate_8_Digit, StringType())

eki_udf = udf(gen_eki, StringType())
opc_udf = udf(gen_opc, StringType())


acc_udf = udf(ACC_function, StringType())

k4_udf = udf(gen_k4, StringType())
op_udf = udf(gen_op, StringType())


# Add new columns to the DataFrame using UDFs
df_with_columns = df.withColumn("PIN1", pin1_udf()) \
                    .withColumn("PUK1", puk1_udf()) \
                    .withColumn("PIN2", pin2_udf()) \
                    .withColumn("PUK2", puk2_udf()) \
                    .withColumn("K4", k4_udf()) \
                    .withColumn("OP", op_udf()) \
                    .withColumn("KI", ki_udf()) \
                    .withColumn("EKI", eki_udf(col("K4"),col("KI"))) \
                    .withColumn("OPC", opc_udf(col("OP"),col("KI"))) \
                    .withColumn("ACC", acc_udf(col("IMSI")))

# Show the result
df_with_columns.show(truncate=False)



ModuleNotFoundError: No module named 'pyspark'

In [6]:
import binascii
import secrets
from typing import List, Dict, Union
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, StructType, StructField
# AES related imports
from Crypto.Cipher import AES


In [7]:


class EncryptionUtils:
    IV = binascii.unhexlify("00000000000000000000000000000000")

    @staticmethod
    def aes_128_cbc_encrypt(key: str, text: str) -> str:
        """AES-128 CBC encryption with fixed IV.

        Args:
            key (str): Secret key as a hex string.
            text (str): Text to be encrypted as a hex string.

        Returns:
            str: Encrypted text as a hex string.
        """
        keyb = binascii.unhexlify(key)
        textb = binascii.unhexlify(text)

        encryptor = AES.new(keyb, AES.MODE_CBC, IV=EncryptionUtils.IV)
        ciphertext = encryptor.encrypt(textb)
        return ciphertext.hex().upper()

    @staticmethod
    def xor_str(s: bytes, t: bytes) -> bytes:
        return bytes([_a ^ _b for _a, _b in zip(s, t)])

    @staticmethod
    def calc_opc_hex(k_hex: str, op_hex: str) -> str:
        """Calculate OPc from KI and OP values using AES encryption.

        Args:
            k_hex (str): Secret key (KI) as a hex string.
            op_hex (str): Operator key (OP) as a hex string.

        Returns:
            str: Calculated OPc as a hex string.
        """
        iv = binascii.unhexlify(16 * "00")
        ki = binascii.unhexlify(k_hex)
        op = binascii.unhexlify(op_hex)

        aes_crypt = AES.new(ki, mode=AES.MODE_CBC, IV=iv)
        o_pc = EncryptionUtils.xor_str(op, aes_crypt.encrypt(op))
        return o_pc.hex().upper()


class KeyGenerator:
    @staticmethod
    def gen_ki() -> str:
        """Generate a random 128-bit key (KI)."""
        return secrets.token_hex(16).upper()

    @staticmethod
    def gen_op() -> str:
        """Generate a fixed OP key."""
        return "59EC8D16FB1724573DE8E6E33A660EEE"

    @staticmethod
    def gen_k4() -> str:
        """Generate a fixed K4 (Transport key)."""
        return "07E751ADDA7E4D9B284A6FCB7F9315FDA1F2D7A9B2BCA8D264652218AF61DEC5"

    @staticmethod
    def gen_opc(ki: str, op: str) -> str:
        """Generate OPc using KI and OP."""
        return EncryptionUtils.calc_opc_hex(ki, op).upper()

    @staticmethod
    def gen_eki(transport: str, ki: str) -> str:
        """Generate EKI by encrypting KI with K4."""
        return EncryptionUtils.aes_128_cbc_encrypt(transport, ki)


class UtilityFunctions:
    @staticmethod
    def generate_4_digit() -> str:
        """Generate a random 4-digit PIN."""
        return str(secrets.SystemRandom().randint(1000, 9999))

    @staticmethod
    def generate_8_digit() -> str:
        """Generate a random 8-digit PUK."""
        return str(secrets.SystemRandom().randint(10000000, 99999999))

    @staticmethod
    def acc_function(imsi: str) -> str:
        """Calculate ACC based on IMSI.

        Args:
            imsi (str): IMSI value.

        Returns:
            str: ACC value as a hex string.
        """
        last_digit = int(imsi[-1])
        acc_binary = bin(1 << last_digit)[2:].zfill(16)
        return format(int(acc_binary, 2), "04x")


class DataFrameGenerator:
    def __init__(self, spark: SparkSession, num_rows: int):
        self.spark = spark
        self.num_rows = num_rows

    def create_dataframe(self, imsi_value: int, iccid_value: int) -> DataFrame:
        """Generate a DataFrame with IMSI and ICCID values.

        Args:
            imsi_value (int): Initial IMSI value.
            iccid_value (int): Initial ICCID value.

        Returns:
            DataFrame: Generated Spark DataFrame with IMSI and ICCID columns.
        """
        schema = StructType([
            StructField("IMSI", StringType(), False),
            StructField("ICCID", StringType(), False),
        ])

        data = [(str(imsi_value + i), str(iccid_value + i)) for i in range(self.num_rows)]
        return self.spark.createDataFrame(data, schema)


class UDFRegistrar:
    @staticmethod
    def register_udfs() -> Dict[str, udf]:
        """Register UDFs for Spark operations.

        Returns:
            Dict[str, udf]: Registered UDFs.
        """
        return {
            "ki_udf": udf(KeyGenerator.gen_ki, StringType()),
            "pin1_udf": udf(UtilityFunctions.generate_4_digit, StringType()),
            "pin2_udf": udf(UtilityFunctions.generate_4_digit, StringType()),
            "puk1_udf": udf(UtilityFunctions.generate_8_digit, StringType()),
            "puk2_udf": udf(UtilityFunctions.generate_8_digit, StringType()),
            "eki_udf": udf(KeyGenerator.gen_eki, StringType()),
            "opc_udf": udf(KeyGenerator.gen_opc, StringType()),
            "acc_udf": udf(UtilityFunctions.acc_function, StringType()),
            "k4_udf": udf(KeyGenerator.gen_k4, StringType()),
            "op_udf": udf(KeyGenerator.gen_op, StringType()),
            "adm_udf": udf(UtilityFunctions.generate_8_digit, StringType()),

        }




In [None]:
# import binascii
# from typing import List

# class EncodingUtils:
#     @staticmethod
#     def swap_nibbles(s: str) -> str:
#         """Swap nibbles in a hex string.

#         Args:
#             s (str): Hex string.

#         Returns:
#             str: Hex string with swapped nibbles.
#         """
#         return "".join([x + y for x, y in zip(s[1::2], s[0::2])])

#     @staticmethod
#     def rpad(s: str, l: int, c: str = "f") -> str:
#         """Right pad a string to a specified length.

#         Args:
#             s (str): String to pad.
#             l (int): Desired length.
#             c (str): Padding character.

#         Returns:
#             str: Right padded string.
#         """
#         return s + c * (l - len(s))

#     @staticmethod
#     def lpad(s: str, l: int, c: str = "f") -> str:
#         """Left pad a string to a specified length.

#         Args:
#             s (str): String to pad.
#             l (int): Desired length.
#             c (str): Padding character.

#         Returns:
#             str: Left padded string.
#         """
#         return c * (l - len(s)) + s

#     @staticmethod
#     def half_round_up(n: int) -> int:
#         """Calculate half the rounded-up value of an integer.

#         Args:
#             n (int): Integer value.

#         Returns:
#             int: Rounded-up half of the integer.
#         """
#         return (n + 1) // 2

#     @staticmethod
#     def h2b(s: str) -> bytearray:
#         """Convert hex string to bytearray.

#         Args:
#             s (str): Hex string.

#         Returns:
#             bytearray: Converted bytearray.
#         """
#         return bytearray.fromhex(s)

#     @staticmethod
#     def b2h(b: bytearray) -> str:
#         """Convert bytearray to hex string.

#         Args:
#             b (bytearray): Bytearray to convert.

#         Returns:
#             str: Hex string.
#         """
#         return "".join(["%02x" % x for x in b])

#     @staticmethod
#     def h2i(s: str) -> List[int]:
#         """Convert hex string to a list of integers.

#         Args:
#             s (str): Hex string.

#         Returns:
#             List[int]: List of integers.
#         """
#         return [(int(x, 16) << 4) + int(y, 16) for x, y in zip(s[0::2], s[1::2])]

#     @staticmethod
#     def i2h(s: List[int]) -> str:
#         """Convert a list of integers to a hex string.

#         Args:
#             s (List[int]): List of integers.

#         Returns:
#             str: Hex string.
#         """
#         return "".join(["%02x" % x for x in s])

#     @staticmethod
#     def h2s(s: str) -> str:
#         """Convert hex string to ASCII string.

#         Args:
#             s (str): Hex string.

#         Returns:
#             str: ASCII string.
#         """
#         return "".join(
#             [chr((int(x, 16) << 4) + int(y, 16))
#              for x, y in zip(s[0::2], s[1::2])
#              if int(x + y, 16) != 0xFF]
#         )

#     @staticmethod
#     def s2h(s: str) -> str:
#         """Convert ASCII string to hex string.

#         Args:
#             s (str): ASCII string.

#         Returns:
#             str: Hex string.
#         """
#         b = bytearray()
#         b.extend(map(ord, s))
#         return EncodingUtils.b2h(b)



#     @staticmethod
#     def enc_imsi(imsi: str) -> str:
#         """Encode IMSI value.

#         Args:
#             imsi (str): IMSI value.

#         Returns:
#             str: Encoded IMSI value.
#         """
#         imsi = str(imsi)
#         l = EncodingUtils.half_round_up(len(imsi) + 1)
#         oe = len(imsi) & 1
#         ei = "%02x" % l + EncodingUtils.swap_nibbles("%01x%s" % ((oe << 3) | 1, EncodingUtils.rpad(imsi, 15)))
#         return ei

#     @staticmethod
#     def dec_imsi(ef: str) -> str:
#         """Decode encoded IMSI value.

#         Args:
#             ef (str): Encoded IMSI value.

#         Returns:
#             str: Decoded IMSI value.
#         """
#         ef = str(ef)
#         if len(ef) < 4:
#             return None
#         l = int(ef[0:2], 16) * 2
#         l = l - 1
#         swapped = EncodingUtils.swap_nibbles(ef[2:]).rstrip("f")
#         if len(swapped) < 1:
#             return None
#         oe = (int(swapped[0]) >> 3) & 1
#         if not oe:
#             l = l - 1
#         if l != len(swapped) - 1:
#             return None
#         imsi = swapped[1:]
#         return imsi

#     @staticmethod
#     def enc_iccid(iccid: str) -> str:
#         """Encode ICCID value.

#         Args:
#             iccid (str): ICCID value.

#         Returns:
#             str: Encoded ICCID value.
#         """


#         iccid = str(iccid)
#         luhn = EncodingUtils.calculate_luhn(iccid)
#         iccid = iccid + str(luhn)
#         m_iccid = EncodingUtils.swap_nibbles(EncodingUtils.rpad(iccid, 20))
#         return m_iccid.upper()

#     @staticmethod
#     def dec_iccid(ef: str) -> str:
#         """Decode encoded ICCID value.

#         Args:
#             ef (str): Encoded ICCID value.

#         Returns:
#             str: Decoded ICCID value.
#         """
#         ef = str(ef)
#         ef = ef.upper()
#         iccid = EncodingUtils.swap_nibbles(ef).strip("F")
#         return iccid[:-1]

#     @staticmethod
#     def calculate_luhn(cc: str) -> int:
#         """Calculate Luhn check digit.

#         Args:
#             cc (str): Credit card number or similar value.

#         Returns:
#             int: Luhn check digit.
#         """
#         num = list(map(int, str(cc)))
#         check_digit = (
#             10 - sum(num[-2::-2] + [sum(divmod(d * 2, 10)) for d in num[::-2]]) % 10
#         )
#         return 0 if check_digit == 10 else check_digit


In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

class EncodingUtils:
    @staticmethod
    def calculate_luhn(cc: str) -> int:
        """Calculate Luhn check digit.

        Args:
            cc (str): Credit card number or similar value.

        Returns:
            int: Luhn check digit.
        """
        num = list(map(int, str(cc)))
        check_digit = (10 - sum(num[-2::-2] + [sum(divmod(d * 2, 10)) for d in num[::-2]]) % 10)
        return 0 if check_digit == 10 else check_digit

    @staticmethod
    def swap_nibbles(s: str) -> str:
        """Swap nibbles in a hex string.

        Args:
            s (str): Hex string.

        Returns:
            str: Hex string with swapped nibbles.
        """
        return "".join([x + y for x, y in zip(s[1::2], s[0::2])])

    @staticmethod
    def rpad(s: str, l: int, c: str = "f") -> str:
        """Right pad a string to a specified length.

        Args:
            s (str): String to pad.
            l (int): Desired length.
            c (str): Padding character.

        Returns:
            str: Right padded string.
        """
        return s + c * (l - len(s))

    @staticmethod
    def lpad(s: str, l: int, c: str = "f") -> str:
        """Left pad a string to a specified length.

        Args:
            s (str): String to pad.
            l (int): Desired length.
            c (str): Padding character.

        Returns:
            str: Left padded string.
        """
        return c * (l - len(s)) + s

    @staticmethod
    def half_round_up(n: int) -> int:
        """Calculate half the rounded-up value of an integer.

        Args:
            n (int): Integer value.

        Returns:
            int: Rounded-up half of the integer.
        """
        return (n + 1) // 2

    @staticmethod
    def h2b(s: str) -> bytearray:
        """Convert hex string to bytearray.

        Args:
            s (str): Hex string.

        Returns:
            bytearray: Converted bytearray.
        """
        return bytearray.fromhex(s)

    @staticmethod
    def b2h(b: bytearray) -> str:
        """Convert bytearray to hex string.

        Args:
            b (bytearray): Bytearray to convert.

        Returns:
            str: Hex string.
        """
        return "".join(["%02x" % x for x in b])

    @staticmethod
    def h2i(s: str) -> List[int]:
        """Convert hex string to a list of integers.

        Args:
            s (str): Hex string.

        Returns:
            List[int]: List of integers.
        """
        return [(int(x, 16) << 4) + int(y, 16) for x, y in zip(s[0::2], s[1::2])]

    @staticmethod
    def i2h(s: List[int]) -> str:
        """Convert a list of integers to a hex string.

        Args:
            s (List[int]): List of integers.

        Returns:
            str: Hex string.
        """
        return "".join(["%02x" % x for x in s])

    @staticmethod
    def h2s(s: str) -> str:
        """Convert hex string to ASCII string.

        Args:
            s (str): Hex string.

        Returns:
            str: ASCII string.
        """
        return "".join(
            [chr((int(x, 16) << 4) + int(y, 16))
             for x, y in zip(s[0::2], s[1::2])
             if int(x + y, 16) != 0xFF]
        )

    @staticmethod
    def s2h(s: str) -> str:
        """Convert ASCII string to hex string.

        Args:
            s (str): ASCII string.

        Returns:
            str: Hex string.
        """
        b = bytearray()
        b.extend(map(ord, s))
        return EncodingUtils.b2h(b)

    @staticmethod
    def enc_iccid(iccid: str) -> str:
        iccid = str(iccid)
        luhn = EncodingUtils.calculate_luhn(iccid)
        iccid = iccid + str(luhn)
        m_iccid = EncodingUtils.swap_nibbles(EncodingUtils.rpad(iccid, 20))
        return m_iccid.upper()

    @staticmethod
    def dec_iccid(ef: str) -> str:
        ef = str(ef)
        ef = ef.upper()
        iccid = EncodingUtils.swap_nibbles(ef).strip("F")
        return iccid[:-1]

    @staticmethod
    def enc_imsi(imsi: str) -> str:
        imsi = str(imsi)
        l = EncodingUtils.half_round_up(len(imsi) + 1)
        oe = len(imsi) & 1
        ei = "%02x" % l + EncodingUtils.swap_nibbles("%01x%s" % ((oe << 3) | 1, EncodingUtils.rpad(imsi, 15)))
        return ei

    @staticmethod
    def dec_imsi(ef: str) -> str:
        ef = str(ef)
        if len(ef) < 4:
            return None
        l = int(ef[0:2], 16) * 2
        l = l - 1
        swapped = EncodingUtils.swap_nibbles(ef[2:]).rstrip("f")
        if len(swapped) < 1:
            return None
        oe = (int(swapped[0]) >> 3) & 1
        if not oe:
            l = l - 1
        if l != len(swapped) - 1:
            return None
        imsi = swapped[1:]
        return imsi

    @staticmethod
    def enc_pin_func(pin: str) -> str:
        return EncodingUtils.rpad(EncodingUtils.s2h(pin), 16).upper()

    @staticmethod
    def dec_pin_func(pin: str) -> str:
        return EncodingUtils.h2s(pin).upper()

    @staticmethod
    def integer_2_ascii(x: str) -> str:
        return EncodingUtils.s2h(x)

    @staticmethod
    def ascii_2_integer(x: str) -> str:
        return EncodingUtils.h2s(str(x))

# Register UDFs
enc_iccid_udf = udf(EncodingUtils.enc_iccid, StringType())
dec_iccid_udf = udf(EncodingUtils.dec_iccid, StringType())
enc_imsi_udf = udf(EncodingUtils.enc_imsi, StringType())
dec_imsi_udf = udf(EncodingUtils.dec_imsi, StringType())
enc_pin_func_udf = udf(EncodingUtils.enc_pin_func, StringType())
dec_pin_func_udf = udf(EncodingUtils.dec_pin_func, StringType())
integer_2_ascii_udf = udf(EncodingUtils.integer_2_ascii, StringType())
ascii_2_integer_udf = udf(EncodingUtils.ascii_2_integer, StringType())


In [10]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("EncodingUtilsExample").getOrCreate()

# Sample DataFrame
#data = [    ("410011112222000", "89012345678901000000", "1234", "5678", "9101", "1122"),]
#schema = ["IMSI", "ICCID", "PIN1", "PUK1", "PIN2", "PUK2"]
#df = spark.createDataFrame(data, schema)

class DataFrameProcessor:
    @staticmethod
    def encode(df):
        if "ICCID" in df.columns:
            df = df.withColumn("ICCID", enc_iccid_udf(df["ICCID"]))
        if "IMSI" in df.columns:
            df = df.withColumn("IMSI", enc_imsi_udf(df["IMSI"]))
        if "PIN1" in df.columns:
            df = df.withColumn("PIN1", enc_pin_func_udf(df["PIN1"]))
        if "PUK1" in df.columns:
            df = df.withColumn("PUK1", integer_2_ascii_udf(df["PUK1"]))
        if "PIN2" in df.columns:
            df = df.withColumn("PIN2", enc_pin_func_udf(df["PIN2"]))
        if "PUK2" in df.columns:
            df = df.withColumn("PUK2", integer_2_ascii_udf(df["PUK2"]))
        return df

    @staticmethod
    def decode(df):
        if "ICCID" in df.columns:
            df = df.withColumn("ICCID", dec_iccid_udf(df["ICCID"]))
        if "IMSI" in df.columns:
            df = df.withColumn("IMSI", dec_imsi_udf(df["IMSI"]))
        if "PIN1" in df.columns:
            df = df.withColumn("PIN1", dec_pin_func_udf(df["PIN1"]))
        if "PUK1" in df.columns:
            df = df.withColumn("PUK1", ascii_2_integer_udf(df["PUK1"]))
        if "PIN2" in df.columns:
            df = df.withColumn("PIN2", dec_pin_func_udf(df["PIN2"]))
        if "PUK2" in df.columns:
            df = df.withColumn("PUK2", ascii_2_integer_udf(df["PUK2"]))
        return df

# Apply encoding
#encoded_df = DataFrameProcessor.encode(df_with_columns)
#encoded_df.show()

# Apply decoding
#decoded_df = DataFrameProcessor.decode(encoded_df)
#decoded_df.show()


In [12]:
def main():
    spark = SparkSession.builder.appName("ExampleApp").getOrCreate()

    # Initial values
    imsi_value = 410011112222000
    iccid_value = 89012345678901000000
    num_rows = 5

    # Create DataFrame
    df_generator = DataFrameGenerator(spark, num_rows)
    df = df_generator.create_dataframe(imsi_value, iccid_value)

    # Register UDFs
    udfs = UDFRegistrar.register_udfs()

    # Apply UDFs and add new columns
    df_with_columns = df.withColumn("PIN1", udfs["pin1_udf"]()) \
                        .withColumn("PUK1", udfs["puk1_udf"]()) \
                        .withColumn("PIN2", udfs["pin2_udf"]()) \
                        .withColumn("PUK2", udfs["puk2_udf"]()) \
                        .withColumn("K4", udfs["k4_udf"]()) \
                        .withColumn("OP", udfs["op_udf"]()) \
                        .withColumn("KI", udfs["ki_udf"]()) \
                        .withColumn("EKI", udfs["eki_udf"](col("K4"), col("KI"))) \
                        .withColumn("OPC", udfs["opc_udf"](col("OP"), col("KI"))) \
                        .withColumn("ACC", udfs["acc_udf"](col("IMSI"))) \
                        .withColumn("ADM1", udfs["adm_udf"]()) \
                        .withColumn("ADM2", udfs["adm_udf"]()) \
                        .withColumn("ADM3", udfs["adm_udf"]()) \
                        .withColumn("ADM4", udfs["adm_udf"]()) \
                        .withColumn("ADM5", udfs["adm_udf"]()) \
                        .withColumn("ADM6", udfs["adm_udf"]()) \
                        .withColumn("ADM7", udfs["adm_udf"]()) \
                        .withColumn("ADM8", udfs["adm_udf"]()) \
                        .withColumn("ADM9", udfs["adm_udf"]()) \
                        .withColumn("ADM10", udfs["adm_udf"]()) \
                        .withColumn("ADM11", udfs["adm_udf"]()) \
                        .withColumn("ADM12", udfs["adm_udf"]())


    encoded_df = DataFrameProcessor.encode(df_with_columns)
    encoded_df.show(truncate= False)


    # Apply decoding
    decoded_df = DataFrameProcessor.decode(encoded_df)
    decoded_df.show(truncate= False)

    # Show the DataFrame
    #df_with_columns.show(truncate=False)


if __name__ == "__main__":
    main()


+------------------+--------------------+----------------+----------------+----------------+----------------+----------------------------------------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|IMSI              |ICCID               |PIN1            |PUK1            |PIN2            |PUK2            |K4                                                              |OP                              |KI                              |EKI                             |OPC                             |ACC |ADM1    |ADM2    |ADM3    |ADM4    |ADM5    |ADM6    |ADM7    |ADM8    |ADM9    |ADM10   |ADM11   |ADM12   |
+------------------+--------------------+----------------+----------------+----------------+----------------+---------------------------------------------------

In [None]:
df.limit(5)

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Chevrolet Chevell...,18.0,8,307.0,130.0,3504.0,12.0,70,US
Buick Skylark 320,15.0,8,350.0,165.0,3693.0,11.5,70,US
Plymouth Satellite,18.0,8,318.0,150.0,3436.0,11.0,70,US
AMC Rebel SST,16.0,8,304.0,150.0,3433.0,12.0,70,US
Ford Torino,17.0,8,302.0,140.0,3449.0,10.5,70,US


<a id='viewing-dataframe-columns'></a>
### Viewing Dataframe Columns

In [None]:
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

<a id='dataframe-schema'></a>
### Dataframe Schema

There are two methods commonly used to view the data types of a dataframe:

In [None]:
df.dtypes

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

In [None]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



<a id='implicit-schema-inference'></a>
#### Inferring Schema Implicitly

We can use the parameter `inferschema=true` to infer the input schema automatically while loading the data. An example is shown below:

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



As you can see, the datatype has been infered automatically spark with even the correct precison for decimal type. A problem that might arise here is that sometimes, when you have to read multiple files with different schemas in different files, there might be an issue with implicit inferring leading to null values in some columns. Therefore, let us also see how to define schemas explicitly.

<a id='explicit-schema-inference'></a>
#### Defining Schema Explicitly

In [None]:
from pyspark.sql.types import *
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [None]:
# Creating a list of the schema in the format column_name, data_type
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [None]:
# Creating the schema that will be passed when reading the csv
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType([StructField('Car', StringType(), True), StructField('MPG', DoubleType(), True), StructField('Cylinders', IntegerType(), True), StructField('Displacement', DoubleType(), True), StructField('Horsepower', DoubleType(), True), StructField('Weight', DoubleType(), True), StructField('Acceleration', DoubleType(), True), StructField('Model', IntegerType(), True), StructField('Origin', StringType(), True)])

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()
# The schema comes as we gave!

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [None]:
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |
|Plymouth Satellite              |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |
|AMC Rebel SST                   |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |
|Ford Torino                     |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US    |
|Ford Galaxie 500                |15.0|8        |429.0       |198.0     |4341.0|10.0        |70   |US    |
|Chevrolet Impala                |14.

As we can see here, the data has been successully loaded with the specified datatypes.

<a id='dataframe-operations-on-columns'></a>
## DataFrame Operations on Columns

We will go over the following in this section:

1.   Selecting Columns
2.   Selecting Multiple Columns
3.   Adding New Columns
4.   Renaming Columns
5.   Grouping By Columns
6.   Removing Columns



<a id='selecting-columns'></a>
### Selecting Columns

There are multiple ways to do a select in PySpark. You can find how they differ and how each below:

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.Car)
print("*"*20)
df.select(df.Car).show(truncate=False)

Column<'Car'>
********************
+--------------------------------+
|Car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



**NOTE:**

> **We can't always use the dot notation because this will break when the column names have reserved names or attributes to the data frame class. Additionally, the column names are case sensitive in nature so we need to always make sure the column names have been changed to a paticular case before using it.**



In [None]:
# 2nd method
# Column name is case insensitive here
print(df['car'])
print("*"*20)
df.select(df['car']).show(truncate=False)

Column<'car'>
********************
+--------------------------------+
|car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



In [None]:
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('car')).show(truncate=False)

+--------------------------------+
|car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



<a id='selecting-multiple-columns'></a>
### Selecting Multiple Columns

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.Car, df.Cylinders)
print("*"*40)
df.select(df.Car, df.Cylinders).show(truncate=False)

Column<'Car'> Column<'Cylinders'>
****************************************
+--------------------------------+---------+
|Car                             |Cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302   

In [None]:
# 2nd method
# Column name is case insensitive in this usage
print(df['car'],df['cylinders'])
print("*"*40)
df.select(df['car'],df['cylinders']).show(truncate=False)

Column<'car'> Column<'cylinders'>
****************************************
+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302   

In [None]:
# 3rd method
# Column name is case insensitive in this usage
from pyspark.sql.functions import col
df.select(col('car'),col('cylinders')).show(truncate=False)

+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet Monte Carlo           |8        |
|Buick Est

<a id='adding-new-columns'></a>
### Adding New Columns

We will take a look at three cases here:

1.   Adding a new column
2.   Adding multiple columns
3.   Deriving a new column from an exisitng one

In [None]:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |1           |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US    |1           |
+-------------------------+----+---------+------------+----------+------+------------+-----+----

In [None]:
# CASE 2: Adding multiple columns
# We will add two new columns called 'second_column' and 'third_column' at the end
df = df.withColumn('second_column', lit(2)) \
       .withColumn('third_column', lit('Third Column'))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|second_column|third_column|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |2            |Third Column|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |2            |Third Column|
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |2            |Third Column|
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |1           |2            |Third Column|
|Ford Torino        

In [None]:
# CASE 3: Deriving a new column from an exisitng one
# We will add a new column called 'car_model' which has the value of car and model appended together with a space in between
from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(col("Car"), lit(" "), col("model")))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|second_column|third_column|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |2            |Third Column|Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |2            |Third Column|Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |2            |Third Column|Plymouth Satelli

As we can see, the new column car model has been created from existing columns. Since our aim was to create a column which has the value of car and model appended together with a space in between we have used the `concat` operator.

<a id='renaming-columns'></a>
### Renaming Columns

We use the `withColumnRenamed` function to rename a columm in PySpark. Let us see it in action below:

In [None]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+--------------+----------------+-----------------------------------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_one|new_column_two|new_column_three|car_model                          |
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+--------------+----------------+-----------------------------------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1             |2             |Third Column    |Chevrolet Chevelle Malibu 70       |
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1             |2             |Third Column    |Buick Skylark 320 70               |
|Plymouth Satellite              |18.0|8        |3

<a id='grouping-by-columns'></a>
### Grouping By Columns

Here, we see the Dataframe API way of grouping values. We will discuss how to:


1.   Group By a single column
2.   Group By multiple columns

In [None]:
# Group By a column in PySpark
df.groupBy('Origin').count().show(5)

+------+-----+
|Origin|count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [None]:
# Group By multiple columns in PySpark
df.groupBy('Origin', 'Model').count().show(5)

+------+-----+-----+
|Origin|Model|count|
+------+-----+-----+
|Europe|   71|    5|
|Europe|   80|    9|
|Europe|   79|    4|
| Japan|   75|    4|
|    US|   72|   18|
+------+-----+-----+
only showing top 5 rows



<a id='removing-columns'></a>
### Removing Columns

In [None]:
#Remove columns in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+----------------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_two|new_column_three|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+----------------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |2             |Third Column    |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |2             |Third Column    |Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |2             |Third Column    |Plymouth Satellite 70       |
|AMC Rebel SST            |16.0|8 

In [None]:
#Remove multiple columnss in one go
df = df.drop('new_column_two') \
       .drop('new_column_three')
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |Plymouth Satellite 70       |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |AMC Rebel SST 70            |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US   

<a id='dataframe-operations-on-rows'></a>
## DataFrame Operations on Rows

We will discuss the follwoing in this section:

1.   Filtering Rows
2. 	 Get Distinct Rows
3.   Sorting Rows
4.   Union Dataframes



<a id='filtering-rows'></a>
### Filtering Rows

In [None]:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter(col('Origin')=='Europe').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

TOTAL RECORD COUNT: 406
EUROPE FILTERED RECORD COUNT: 73
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Citroen DS-21 Pallas        |0.0 |4        |133.0       |115.0     |3090.0|17.5        |70   |Europe|Citroen DS-21 Pallas 70        |
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.0        |46.0      |1835.0|20.5        |70   |Europe|Volkswagen 1131 Deluxe Sedan 70|
|Peugeot 504                 |25.0|4        |110.0       |87.0      |2672.0|17.5        |70   |Europe|Peugeot 504 70                 |
|Audi 100 LS                 |24.0|4        |107.0       |90.0      |2430.0|14.5        |70   |Europe|Audi 100 LS 70                 

In [None]:
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count))
europe_filtered_count = df.filter((col('Origin')=='Europe') &
                                  (col('Cylinders')==4)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

TOTAL RECORD COUNT: 406
EUROPE FILTERED RECORD COUNT: 66
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Citroen DS-21 Pallas        |0.0 |4        |133.0       |115.0     |3090.0|17.5        |70   |Europe|Citroen DS-21 Pallas 70        |
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.0        |46.0      |1835.0|20.5        |70   |Europe|Volkswagen 1131 Deluxe Sedan 70|
|Peugeot 504                 |25.0|4        |110.0       |87.0      |2672.0|17.5        |70   |Europe|Peugeot 504 70                 |
|Audi 100 LS                 |24.0|4        |107.0       |90.0      |2430.0|14.5        |70   |Europe|Audi 100 LS 70                 

<a id='get-distinct-rows'></a>
### Get Distinct Rows

In [None]:
#Get Unique Rows in PySpark
df.select('Origin').distinct().show()

+------+
|Origin|
+------+
|Europe|
|    US|
| Japan|
+------+



In [None]:
#Get Unique Rows in PySpark based on mutliple columns
df.select('Origin','model').distinct().show()

+------+-----+
|Origin|model|
+------+-----+
|Europe|   71|
|Europe|   80|
|Europe|   79|
| Japan|   75|
|    US|   72|
|    US|   80|
|Europe|   74|
| Japan|   79|
|Europe|   76|
|    US|   75|
| Japan|   77|
|    US|   82|
| Japan|   80|
| Japan|   78|
|    US|   78|
|Europe|   75|
|    US|   71|
|    US|   77|
| Japan|   70|
| Japan|   71|
+------+-----+
only showing top 20 rows



<a id='sorting-rows'></a>
### Sorting Rows

In [None]:
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('Cylinders').show(truncate=False)

+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Mazda RX2 Coupe             |19.0|3        |70.0        |97.0      |2330.0|13.5        |72   |Japan |Mazda RX2 Coupe 72             |
|Mazda RX3                   |18.0|3        |70.0        |90.0      |2124.0|13.5        |73   |Japan |Mazda RX3 73                   |
|Mazda RX-4                  |21.5|3        |80.0        |110.0     |2720.0|13.5        |77   |Japan |Mazda RX-4 77                  |
|Mazda RX-7 GS               |23.7|3        |70.0        |100.0     |2420.0|12.5        |80   |Japan |Mazda RX-7 GS 80               |
|Datsun 510 (sw)             |28.0|4        |97.0      

In [None]:
# To change the sorting order, you can use the ascending parameter
df.orderBy('Cylinders', ascending=False).show(truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Plymouth 'Cuda 340       |14.0|8        |340.0       |160.0     |3609.0|8.0         |70   |US    |Plymouth 'Cuda 340 70       |
|Pontiac Safari (sw)      |13.0|8        |400.0       |175.0     |5140.0|12.0        |71   |US    |Pontiac Safari (sw) 71      |
|Ford Mustang Boss 302    |0.0 |8        |302.0       |140.0     |3353.0|8.0         |70   |US    |Ford Mustang Boss 302 70    |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |Buick Skylark 320 70        |
|Chevrolet Monte Carlo    |15.0|8        |400.0       |150.0     |3761.0|9.5         |70   |US   

In [None]:
# Using groupBy aand orderBy together
df.groupBy("Origin").count().orderBy('count', ascending=False).show(10)

+------+-----+
|Origin|count|
+------+-----+
|    US|  254|
| Japan|   79|
|Europe|   73|
+------+-----+



<a id='union-dataframes'></a>
### Union Dataframes

You will see three main methods for performing union of dataframes. It is important to know the difference between them and which one is preferred:

*   `union()` – It is used to merge two DataFrames of the same structure/schema. If schemas are not the same, it returns an error
*   `unionAll()` – This function is deprecated since Spark 2.0.0, and replaced with union()
*   `unionByName()` - This function is used to merge two dataframes based on column name.

> Since `unionAll()` is deprecated, **`union()` is the preferred method for merging dataframes.**
<br>
> The difference between `unionByName()` and `union()` is that `unionByName()` resolves columns by name, not by position.

In other SQLs, Union eliminates the duplicates but UnionAll merges two datasets, thereby including duplicate records. But, in PySpark, both behave the same and includes duplicate records. The recommendation is to use `distinct()` or `dropDuplicates()` to remove duplicate records.

In [None]:
# CASE 1: Union When columns are in order
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))

EUROPE CARS: 3
JAPAN CARS: 4
AFTER UNION: 7


**Result:**

> As you can see here, there were 3 cars from Europe with 5 Cylinders, and 4 cars from Japan with 3 Cylinders. After union, there are 7 cars in total.



In [None]:
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



**Result:**

> As you can see here, the two dataframes have been successfully merged based on their column names.



<a id='common-data-manipulation-functions'></a>
## Common Data Manipulation Functions

In [None]:
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions))



<a id='string-functions'></a>
### String Functions

In [None]:
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)

**Display the Car column in exisitng, lower and upper characters, and the first 4 characters of the column**

In [None]:
from pyspark.sql.functions import col,lower, upper, substring
# Prints out the details of a function
help(substring)
# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concatenated value")).show(5, False)

Help on function substring in module pyspark.sql.functions:

substring(str: 'ColumnOrName', pos: int, len: int) -> pyspark.sql.column.Column
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Parameters
    ----------
    str : :class:`~pyspark.sql.Column` or str
        target column to work on.
    pos : int
        starting position in str.
    len : int
        length of chars.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        substring of given value.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]

+---------------

**Concatenate the Car column and Model column and add a space between them.**

In [None]:
from pyspark.sql.functions import concat
df.select(col("Car"),col("model"),concat(col("Car"), lit(" "), col("model"))).show(5, False)

+-------------------------+-----+----------------------------+
|Car                      |model|concat(Car,  , model)       |
+-------------------------+-----+----------------------------+
|Chevrolet Chevelle Malibu|70   |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |70   |Buick Skylark 320 70        |
|Plymouth Satellite       |70   |Plymouth Satellite 70       |
|AMC Rebel SST            |70   |AMC Rebel SST 70            |
|Ford Torino              |70   |Ford Torino 70              |
+-------------------------+-----+----------------------------+
only showing top 5 rows



<a id='numeric-functions'></a>
### Numeric functions

**Show the oldest date and the most recent date**

In [None]:
from pyspark.sql.functions import min, max
df.select(min(col('Weight')), max(col('Weight'))).show()

+-----------+-----------+
|min(Weight)|max(Weight)|
+-----------+-----------+
|       1613|       5140|
+-----------+-----------+



**Add 10 to the minimum and maximum weight**

In [None]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()

+------------------+------------------+
|(min(Weight) + 10)|max((Weight + 10))|
+------------------+------------------+
|              1623|              5150|
+------------------+------------------+



<a id='operations-on-date'></a>
### Operations on Date

> [PySpark follows SimpleDateFormat table of Java. Click here to view the docs.](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html)

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

+-------------------+
|                DOB|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+

root
 |-- DOB: string (nullable = true)



In [None]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy-MM-dd HH:mm:ss)|to_timestamp(DOB, yyyy-MM-dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy-MM-dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



In [None]:
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('DOB'),'dd/MMM/yyyy HH:mm:ss'))
df.show()
df.printSchema()

+----------------------------------+---------------------------------------+
|to_date(DOB, dd/MMM/yyyy HH:mm:ss)|to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss)|
+----------------------------------+---------------------------------------+
|                        2019-12-25|                    2019-12-25 13:30:00|
+----------------------------------+---------------------------------------+

root
 |-- to_date(DOB, dd/MMM/yyyy HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss): timestamp (nullable = true)



**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [None]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()

+----------------------+----------------------+
|date_add(max(Date), 3)|date_sub(min(Date), 3)|
+----------------------+----------------------+
|            2021-04-02|            1989-12-29|
+----------------------+----------------------+



<a id='joins-in-pyspark'></a>
## Joins in PySpark

In [None]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

+---+--------+
| id|car_name|
+---+--------+
|  1|   Car A|
|  2|   Car B|
|  3|   Car C|
+---+--------+

+---+---------+
| id|car_price|
+---+---------+
|  1|     1000|
|  2|     2000|
|  3|     3000|
+---+---------+



In [None]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

+---+--------+---------+
|id |car_name|car_price|
+---+--------+---------+
|1  |Car A   |1000     |
|2  |Car B   |2000     |
|3  |Car C   |3000     |
+---+--------+---------+



As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:
1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

<a id='spark-sql'></a>
## Spark SQL

SQL has been around since the 1970s, and so one can imagine the number of people who made it their bread and butter. As big data came into popularity, the number of professionals with the technical knowledge to deal with it was in shortage. This led to the creation of Spark SQL. To quote the docs:<br>
>Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views. Let me explain Spark SQL with an example.


In [None]:
# Load data
df = spark.read.csv('cars.csv', header=True, sep=";")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+

+-----------+
|total_count|
+-----------+
|        406|
+-----------+



As you can see, we registered the dataframe as temporary table and then ran basic SQL queries on it. How amazing is that?!<br>
If you are a person who is more comfortable with SQL, then this feature is truly a blessing for you! But this raises a question:
> *Should I just keep using Spark SQL all the time?*

And the answer is, _**it depends**_.<br>
So basically, the different functions acts in differnet ways, and depending upon the type of action you are trying to do, the speed at which it completes execution also differs. But as time progress, this feature is getting better and better, so hopefully the difference should be a small margin. There are plenty of analysis done on this, but nothing has a definite answer yet. You can read this [comparative study done by horton works](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547) or the answer to this [stackoverflow question](https://stackoverflow.com/questions/45430816/writing-sql-vs-using-dataframe-apis-in-spark-sql) if you are still curious about it.

<a id='rdd'></a>
## RDD

> With map, you define a function and then apply it record by record. Flatmap returns a new RDD by first applying a function to all of the elements in RDDs and then flattening the result. Filter, returns a new RDD. Meaning only the elements that satisfy a condition. With reduce, we are taking neighboring elements and producing a single combined result.
For example, let's say you have a set of numbers. You can reduce this to its sum by providing a function that takes as input two values and reduces them to one.

Some of the reasons you would use a dataframe over RDD are:
1.   It's ability to represnt data as rows and columns. But this also means it can only hold structred and semi-structured data.
2.   It allows processing data in different formats (AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL).
3. It's superior job Optimization capability.
4. DataFrame API is very easy to use.





In [None]:
cars = spark.sparkContext.textFile('cars.csv')
print(cars.first())
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())

Car;MPG;Cylinders;Displacement;Horsepower;Weight;Acceleration;Model;Origin
Chevrolet Chevelle Malibu;18.0;8;307.0;130.0;3504.;12.0;70;US


**How many cars are there in our csv data?**

In [None]:
cars_rest.map(lambda line: line.split(";")).count()

406

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in Europe**

In [None]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8]=='Europe').
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Volkswagen Type 3', '23.0', '4', '2254.', 'Europe'),
 ('Volvo 145e (sw)', '18.0', '4', '2933.', 'Europe'),
 ('Volkswagen 411 (sw)', '22.0', '4', '2511.', 'Europe'),
 ('Peugeot 504 (sw)', '21.0', '4', '2979.', 'Europe'),
 ('Renault 12 (sw)', '26.0', '4', '2189.', 'Europe'),
 ('Volkswagen Super Beetle', '26.0', '4', '1950.', 'Europe'),
 ('Fiat 124 Sport Coupe', '26.0', '4', '2265.', 'Europe'),
 ('Fiat 128', '29

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in either Europe or Japan**

In [None]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8] in ['Europe','Japan']).
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Toyota Corolla Mark ii', '24.0', '4', '2372.', 'Japan'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Toyota Corolla', '25.0', '4', '2228.', 'Japan'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Toyota Corolla 1200', '31.0', '4', '1773.', 'Japan'),
 ('Datsun 1200', '35.0', '4', '1613.', 'Japan'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Toyota Corolla Hardtop', '24.0', '4', '2278.', 'Japan'),
 ('Volkswagen Type 3', '23.0', '4', '

<a id='user-defined-functions-udf'></a>
## User-Defined Functions (UDF)

PySpark User-Defined Functions (UDFs) help you convert your python code into a scalable version of itself. It comes in handy more than you can imagine, but beware, as the performance is less when you compare it with pyspark functions. You can view examples of how UDF works [here](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html). What I will give in this section is some theory on how it works, and why it is slower.

When you try to run a UDF in PySpark, each executor creates a python process. Data will be serialised and deserialised between each executor and python. This leads to lots of performance impact and overhead on spark jobs, making it less efficent than using spark dataframes. Apart from this, sometimes you might have memory issues while using UDFs. The Python worker consumes huge off-heap memory and so it often leads to memoryOverhead, thereby failing your job. Keeping these in mind, I wouldn't recommend using them, but at the end of the day, your choice.

<a id='common-questions'></a>
# Common Questions

<a id='recommended-ide'></a>
## Recommended IDE

I personally prefer [PyCharm](https://www.jetbrains.com/pycharm/) while coding in Python/PySpark. It's based on IntelliJ IDEA so it has a lot of features! And the main advantage I have felt is the ease of installing PySpark and other packages. You can customize it with themes and plugins, and it lets you enhance productivity while coding by providing some features like suggestions, local VCS etc.

<a id='submitting-a-spark-job'></a>
## Submitting a Spark Job

The python syntax for running jobs is: `python <file_name>.py <arg1> <arg2> ...`
<br>But when you submit a spark job you have to use spark-submit to run the application.

Here is a simple example of a spark-submit command:
`spark-submit filename.py --named_argument 'arguemnt value'`<br>
Here, named_argument is an argument that you are reading from inside your script.

There are other options you can pass in the command, like:<br>
`--py-files` which helps you pass a python file to read in your file,<br>
`--files` which helps pass other files like txt or config,<br>
`--deploy-mode` which tells wether to deploy your worker node on cluster or locally <br>
`--conf` which helps pass different configurations, like memoryOverhead, dynamicAllocation etc.

There is an [entire page](https://spark.apache.org/docs/latest/submitting-applications.html) in spark documentation dedicated to this. I highly recommend you go through it once.

<a id='creating-dataframes'></a>
## Creating Dataframes

When getting started with dataframes, the most common question is: *'How do I create a dataframe?'* <br>
Below, you can see how to create three kinds of dataframes:

### Create a totally empty dataframe

In [None]:
from pyspark.sql.types import StructType
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()

++
||
++
++



### Create an empty dataframe with header

In [None]:
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()

+----+
|name|
+----+
+----+



### Create a dataframe with header and data

In [None]:
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":13},
  {"name":'Jacob',"age":24},
  {"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



In [None]:
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
        Row(name='Alice', age=13),
        Row(name='Jacob', age=24),
        Row(name='Betty', age=135)]).toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



<a id='drop-duplicates'></a>
## Drop Duplicates

As mentioned earlier, there are two easy to remove duplicates from a dataframe. We have already seen the usage of distinct under <a href="#get-distinct-rows">Get Distinct Rows</a>  section.
I will expalin how to use the `dropDuplicates()` function to achieve the same.

> `drop_duplicates()` is an alias for `dropDuplicates()`

In [None]:
from pyspark.sql import Row
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":5,"height":80},
  {"name":'Jacob',"age":24,"height":80},
  {"name":'Alice',"age":5,"height":80}
]
df = spark.createDataFrame(Row(**x) for x in mylist)
df.dropDuplicates().show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Jacob| 24|    80|
+-----+---+------+



`dropDuplicates()` can also take in an optional parameter called *subset* which helps specify the columns on which the duplicate check needs to be done on.

In [None]:
df.dropDuplicates(subset=['height']).show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+



<a id='fine-tuning-a-pyspark-job'></a>
## Fine Tuning a Spark Job

Before we begin, please note that this entire section is written purely based on experience. It might differ with use cases, but it will help you get a better understanding of what you should be looking for, or act as a guidance to achieve your aim.

>Spark Performance Tuning refers to the process of adjusting settings to record for memory, cores, and instances used by the system. This process guarantees that the Spark has a flawless performance and also prevents bottlenecking of resources in Spark.

Considering you are using Amazon EMR to execute your spark jobs, there are three aspects you need to take care of:
1. EMR Sizing
2. Spark Configurations
3. Job Tuning



<a id='emr-sizing'></a>
### EMR Sizing

Sizing your EMR is extremely important, as this affects the efficency of your spark jobs. Apart from the cost factor, the maximum number of nodes and memory your job can use will be decided by this. If you spin up a EMR with high specifications, that obviously means you are paying more for it, so we should ideally utilize it to the max. These are the guidelines that I follow to make sure the EMR is rightly sized:

1. Size of the input data (include all the input data) on the disk.
2. Whether the jobs have transformations or just a straight pass through.<br> Assess the joins and the complex joins involved.
3. Size of the output data on the disk.

Look at the above criteria against the memory you need to process, and the disk space you would need. Start with a small configuration, and keep adding nodes to arrive at an optimal configuration. In case you are wondering about the *Execution time vs EMR configuration* factor, please understand that it is okay for a job to run longer, rather than adding more resources to the cluster. For example, it is okay to run a job for 40 mins job on a 5 node cluster, rather than running a job in 10 mins on a 15 node cluster.


Another thing you need to know about EMRs, are the different kinds of EC2 instance types provided by Amazon. I will briefly talk about them, but I strongly recommend you to read more about it from the [official documentation](https://aws.amazon.com/ec2/instance-types/). There are 5 types of instance classes. Based on the job you want to run, you can decide which one to use:

>Instance Class | Description
>--- | ---
>General purpose | Balance of compute, memory and networking resources
>Compute optimized | Ideal for compute bound applications that benefit from high performance processors
>Memory optimized | Designed to deliver fast performance for workloads that process large data sets in memory
>Storage optimized | For workloads that require high, sequential read and write access to very large data sets on local storage
>GPU instances | Use hardware accelerators, or co-processors, to perform high demanding functions, more efficiently than is possible in software running on CPUs

The configuration (memory, storage, cpu, network performance) will differ based on the instance class you choose.<br>
To help make life easier, here is what I do when I get into a predicament about which one to go with: <br>
 1. Visit [ec2instances](https://www.ec2instances.info/)
 2. Choose the EC2 instances in question
 3. Click on compare selected

This will easily help you undesrstand what you are getting into, and thereby help you make the best choice! The site was built by [Garret Heaton](https://github.com/powdahound)(founder of Swoot), and has helped me countless number of times to make an informed decision.

<a id='spark-configurations'></a>
### Spark Configurations

There are a ton of [configurations](https://spark.apache.org/docs/latest/configuration.html) that you can tweak when it comes to Spark. Here, I will be noting down some of the configurations which I use, which have worked well for me. Alright! let's get into it!

#### Job Scheduling

When you submit your job in a cluster, it will be given to Spark Schedulers, which is responsible for materializing a logical plan for your job. There are two types of [job scheduling](https://spark.apache.org/docs/latest/job-scheduling.html):
1. FIFO<br>
By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into stages (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.
2. FAIR<br>
The fair scheduler supports grouping jobs into pools and setting different scheduling options (e.g. weight) for each pool. This can be useful to create a high-priority pool for more important jobs, for example, or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. This approach is modeled after the Hadoop Fair Scheduler.

> I personally prefer using the FAIR mode, and this can be set by adding `.config("spark.scheduler.mode", "FAIR")` when you create your SparkSession.


#### Serializer

We have two types of [serializers](https://spark.apache.org/docs/latest/tuning.html#data-serialization) available:
1. Java serialization
2. Kryo serialization

Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.

Java serialization is used by default because if you have custom class that extends Serializable it can be easily used. You can also control the performance of your serialization more closely by extending java.io.Externalizable

> The general recommendation is to use Kyro as the serializer whenver possible, as it leads to much smaller sizes than Java serialization. It can be added by using `.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")` when you create your SparkSession.


#### Shuffle Behaviour

It is generally a good idea to compress the output file after the map phase. The `spark.shuffle.compress` property decides whether to do the compression or not. The compression used is `spark.io.compression.codec`.

> The property can be added by using `.config("spark.shuffle.compress", "true")` when you create your SparkSession.

#### Compression and Serialization

There are 4 defaiult codecs spark provides to compress internal data such as RDD partitions, event log, broadcast variables and shuffle outputs. They are:

1. lz4
2. lzf
3. snappy
4. zstd

> The decision on which to use rests upon the use case. I generally use the `snappy` compression. Google created Snappy because they needed something that offered very fast compression at the expense of final size. Snappy is fast, stable and free, but it increases the size more than the other codecs. At the same time, since compute costs will be less, it seems like balanced trade off. The property can be added by using `.config("spark.io.compression.codec", "snappy")` when you create your SparkSession.

This [session](https://databricks.com/session/best-practice-of-compression-decompression-codes-in-apache-spark) explains the best practice of compression/decompression codes in Apache Spark. I recommend you to take a look at it before taking a decision.

#### Scheduling

The property `spark.speculation` performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. Speculative execution will not stop the slow running task but it launches the new task in parallel.

> I usually disable this option by adding `.config("spark.speculation", "false") ` when I create the SparkSession.

#### Application Properties

There are mainly two application properties that you should know about:

1. spark.driver.memoryOverhead - The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes.

2. spark.executor.memoryOverhead - The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.

> If you ever face an issue like `Container killed by YARN for exceeding memory limits`, know that it is because you have not specified enough memory Overhead for your job to successfully execute. The default value for Overhead is 10% of avaialbe memory (driver/executor sepearte), with minimum of 384.



#### Dynamic Allocation

Lastly, I want to talk about Dynamic Allocation. This is a feature I constantly use while executing my jobs. This property is by defualt set to False. As the name suggests, it sets whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. Truly a wonderful feature, and the greatest benefit of using it is that it will help make the best use of all the resources you have! The disadvantage of this feature is that it does not shine well when you have to execute tasks in parallel. Since most of the resources will be used by the first task, the second one will have to wait till some resource gets released. At the same time, if both get submitted at the exact same time, the resources will be shared between them, although not equally. Also, it is not guaranteed to *always* use the most optimal configurations. But in all my tests, the results have been great!

> If you are planning on using this feature, you can pass the configurations as required through the spark-submit command. The four configurations which you will have to keep in mind are:<br>
```
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.initialExecutors
--conf spark.dynamicAllocation.minExecutors
--conf spark.dynamicAllocation.maxExecutors
```

You can read more about this feature [here](https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation) and [here](https://stackoverflow.com/questions/40200389/how-to-execute-spark-programs-with-dynamic-resource-allocation).





<a id='job-tuning'></a>
### Job Tuning

Apart from EMR and Spark tuning, there is another way to approach opttimizations, and that is by tuning your job itself to produce results efficently. I will be going over some such techniques which will help you achieve this. The [Spark Programming Guide](https://spark.apache.org/docs/2.1.1/programming-guide.html) talks more about these concepts in detail. If you guys prefer watching a video over reading, I highly recommend [A Deep Dive into Proper Optimization for Spark Jobs](https://youtu.be/daXEp4HmS-E) by Daniel Tomes from Databricks, which I found really useful and informative!

#### Broadcast Joins (Broadcast Hash Join)

For some jobs, the efficenecy can be increased by caching them in memory. Broadcast Hash Join(BHJ) is such a technique which will help you optimize join queries when the size of one side of the data is low.
>BroadCast joins are the fastest but the drawaback is that it will consume more memory on both the executor and driver.

This following steps give a sneak peek into how it works, which will help you understand the use cases where it can be used:<br>
1. Input file(smaller of the two tables) to be broadcasted is read by the executors in parallel into its working memory.
2. All the data from the executors is collected into driver (Hence, the need for higher memory at driver).
3. The driver then broadcasts the combined dataset (full copy) into each executor.
4. The size of the broadcasted dataset could be several (10-20+) times bigger the input in memory due to factors like deserialization.
5. Executors will end up storing the parts it read first, and also the full copy, thereby leading to a high memory requirement.

Some things to keep in mind about BHJ:
1. It is advisable to use broadcast joins on small datasets only (dimesnion table, for example).
2. Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ.
3. You could notice skews in tasks due to uneven partition sizes; especially during aggregations, joins etc. This can be evened out by introducing Salt value (random value).<br>*Suggested formula for salt value:* random(0 – (shuffle partition count – 1))


#### Spark Partitions

A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are the basic units of parallelism in Spark. Having too large a number of partitions or too few is not an ideal solution. The number of partitions in spark should be decided based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all. Generally, spark partitioning can be broken down in three ways:
1. Input
2. Shuffle
3. Output


##### Input

Spark usually does a good job of figuring the ideal configuration for this one, except in very particular cases. It is advisable to use the spark default unless:
1. Increase parallelism
2. Heavily nested data
3. Generating data (explode)
4. Source is not optimal
5. You are using UDFs

`spark.sql.files.maxpartitionBytes`: This property indicates the maximum number of bytes to pack into a single partition when reading files (Default 128 MB) . Use this to increase the parallelism in reading input data. For example, if you have more cores, then you can increase the number of parallel tasks which will ensure usage of the all the cores of the cluster, and increase the speed of the task.

##### Shuffle

One of the major reason why most jobs lags in performance is, for the majority of the time, because they get the shuffle partitions count wrong. By default, the value is set to 200. In almost all situations, this is not ideal. If you are dealing with shuffle satge of less than 20 GB, 200 is fine, but otherwise this needs to be changed. For most cases, you can use the following equation to find the right value:
>`Partition Count = Stage Input Data / Target Size` where <br>
`Largest Shuffle Stage (Target Size) < 200MB/partition` in most cases.<br>
`spark.sql.shuffle.partitions` property is used to set the ideal partition count value.

If you ever notice that target size at the range of TBs, there is something terribly wrong, and you might want to change it back to 200, or recalculate it. Shuffle partitions can be configured for every action (not transformation) in the spark script.

Let us use an example to explain this scenario: <br>
Assume shuffle stage input = 210 GB. <br>
Partition Count = Stage Input Data / Target Size = 210000 MB/200 MB = 1050. <br>
As you can see, my shuffle partitions should be 1050, not 200.

But, if your cluster has 2000 cores, then set your shuffle partitions to 2000.
>In a large cluster dealing with a large data job, never set your shuffle partitions less than your total core count.



Shuffle stages almost always precede the write stages and having high shuffle partition count creates small files in the output. To address this, use localCheckPoint just before write & do a coalesce call. This localCheckPoint writes the Shuffle Partition to executor local disk and then coalesces into lower partition count and hence improves the overall performance of both shuffle stage and write stage.

##### Output

There are different methods to write the data. You can control the size, composition, number of files in the output and even the number of records in each file while writing the data. While writing the data, you can increase the parallelism, thereby ensuring you use all the resources that you have. But this approach would lead to a larger number of smaller files. Usually, this isn't a problem, but if you want bigger files, you will have to use one of the compaction techniques, preferably in a cluster with lesser configuration. There are multiple ways to change the composition of the output. Keep these two in mind about composition:
1. Coalesce: Use this to reduce the number of partitions.
2. Repartition: Use this very rarely, and never to reduce the number of partitions<br>
    a. Range Paritioner - It partitions the data either based on some sorted order OR set of sorted ranges of keys. <br>
    b. Hash Partioner - It spreads around the data in the partitioning based upon the key value. Hash partitioning can make distributed data skewed.

<a id='best-practices'></a>
### Best Practices

Try to incorporate these to your coding habits for better performance:
1.   Do not use NOT IN use NOT EXISTS.
2.   Remove Counts, Distinct Counts (use approxCountDIstinct).
3.   Drop Duplicates early.
4.   Always prefer SQL functions over PandasUDF.
5.   Use Hive partitions effectively.
6.   Leverage Spark UI effectively.
7.   Avoid Shuffle Spills.
8.   Aim for target cluster utilization of atleast 70%.



In [None]:
%%shell
jupyter nbconvert --to html /content/Colab_and_PySpark.ipynb

[NbConvertApp] Converting notebook /content/Colab_and_PySpark.ipynb to html
[NbConvertApp] Writing 873303 bytes to /content/Colab_and_PySpark.html


