In [1]:
import os

import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

In [2]:
#### Setting up the SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
#### Load a toy csv dataset about synthetic customers for illustration
ROOT_PATH = os.getcwd()
INPUT_PATH = os.path.join(ROOT_PATH, "inputs", "historized_customer.csv")

df = spark.read\
          .option("header", True)\
          .option("inferSchema", True)\
          .option("delimiter", ";")\
          .csv(INPUT_PATH)

In [4]:
#### General information about our dataframe
print("Shape: ", (df.count(), len(df.columns)), "\n")
print("Nbr of partitions:", df.rdd.getNumPartitions(), "\n")
print("---- Schema ----")
df.printSchema()

Shape:  (344, 7) 

Nbr of partitions: 1 

---- Schema ----
root
 |-- CUSTOMER_ID: string (nullable = true)
 |-- YoB: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- AUM: integer (nullable = true)
 |-- SEGMENT: string (nullable = true)
 |-- valid_from: string (nullable = true)
 |-- valid_to: string (nullable = true)



In [5]:
# Lower case all clolumn names and convert dates column to datetype
df = df.select([F.col(c).alias(c.lower()) for c in df.columns])

df = df.withColumn("valid_from", F.to_date(F.col("valid_from"), "dd.MM.yyyy"))\
       .withColumn("valid_to", F.to_date(F.col("valid_to"), "dd.MM.yyyy"))

# Let's also compute their age based on the Year-of-Birth (YOB)
df = df.withColumn("age", F.lit(2021) - F.col("yob"))

df.show(5)

+-----------+----+-------+------+-------+----------+----------+---+
|customer_id| yob|country|   aum|segment|valid_from|  valid_to|age|
+-----------+----+-------+------+-------+----------+----------+---+
|    CUST001|1954|     UK|150000|     S1|2019-06-01|9999-12-12| 67|
|    CUST001|1954|     FR|160000|     S1|2014-12-20|2019-06-01| 67|
|    CUST001|1954|     CH|170000|     S1|2000-01-01|2014-12-20| 67|
|    CUST002|1955|     UK|180000|     S1|1996-06-01|9999-12-12| 66|
|    CUST003|1956|     UK|190000|     S1|1996-06-02|9999-12-12| 65|
+-----------+----+-------+------+-------+----------+----------+---+
only showing top 5 rows



In [6]:
#### Deduplication with a Window-filter

# Note: depending on the use case, one may need F.rank() or F.dense_rank() instead of F.row_number()
def dedup_funk(table):
    PARTITION = ["customer_id"]
    w = Window.partitionBy(PARTITION)\
              .orderBy(F.col("valid_from").desc(), F.col("valid_to").desc())
    
    return table.select("*", F.row_number().over(w).alias("rn"))\
                .filter(F.col("rn") == 1)\
                .drop("rn")

# Show the output example ordered by customer_ids in ascending order
df_deduped = dedup_funk(df).orderBy("customer_id")
df_deduped.show(5)

+-----------+----+-------+------+-------+----------+----------+---+
|customer_id| yob|country|   aum|segment|valid_from|  valid_to|age|
+-----------+----+-------+------+-------+----------+----------+---+
|    CUST001|1954|     UK|150000|     S1|2019-06-01|9999-12-12| 67|
|    CUST002|1955|     UK|180000|     S1|1996-06-01|9999-12-12| 66|
|    CUST003|1956|     UK|190000|     S1|1996-06-02|9999-12-12| 65|
|    CUST004|1957|     UK|200000|     S1|1996-06-03|9999-12-12| 64|
|    CUST005|1958|     UK|210000|     S1|1996-06-04|9999-12-12| 63|
+-----------+----+-------+------+-------+----------+----------+---+
only showing top 5 rows



In [7]:
#### Pivot with multi aggregation

# It is more efficient to specify the values in the pivot function, so that Spark doesn't need
# to first compute the list of distinct values internally.

def pivot_funk(table):
    AGGR = [F.sum(F.col(c)).alias("sum_" + c) for c in ["aum", "age"]]
    return table.groupBy("country")\
                .pivot("segment", ["S1", "S2"])\
                .agg(*AGGR)


df_pivot = pivot_funk(df)
df_pivot.show()

+-------+----------+----------+----------+----------+
|country|S1_sum_aum|S1_sum_age|S2_sum_aum|S2_sum_age|
+-------+----------+----------+----------+----------+
|     NL|    230000|        61|      null|      null|
|     MX|      null|      null| 321950000|      3994|
|     ES|  14490000|       336|      null|      null|
|     FR|    160000|        67|  37830000|      1079|
|     CH|   9410000|       928|      null|      null|
|     UK|  42520000|      2405| 214970000|      2066|
+-------+----------+----------+----------+----------+



In [8]:
# Window function to compute average age per country
def avg_age_funk(table):
    PARTITION = ["country"]
    w = Window.partitionBy(PARTITION)
    return table.select("*", F.avg(F.col("age")).over(w).alias("avg_per_country"))

df_avg_age = avg_age_funk(df)
df_avg_age.show(10)

+-----------+----+-------+-------+-------+----------+----------+---+------------------+
|customer_id| yob|country|    aum|segment|valid_from|  valid_to|age|   avg_per_country|
+-----------+----+-------+-------+-------+----------+----------+---+------------------+
|    CUST007|1960|     NL| 230000|     S1|2018-07-08|9999-12-12| 61|              61.0|
|    CUST150|1999|     MX|1670000|     S2|1996-10-27|9999-12-12| 22|29.153284671532848|
|    CUST151|2000|     MX|1680000|     S2|1996-10-28|9999-12-12| 21|29.153284671532848|
|    CUST152|2001|     MX|1690000|     S2|1996-10-29|9999-12-12| 20|29.153284671532848|
|    CUST153|2002|     MX|1700000|     S2|1996-10-30|9999-12-12| 19|29.153284671532848|
|    CUST154|2003|     MX|1710000|     S2|1996-10-31|9999-12-12| 18|29.153284671532848|
|    CUST155|2004|     MX|1720000|     S2|1996-11-01|9999-12-12| 17|29.153284671532848|
|    CUST156|2005|     MX|1730000|     S2|1996-11-02|9999-12-12| 16|29.153284671532848|
|    CUST157|2006|     MX|174000

In [None]:
#### Broadcast small table in a join to speed up the process
def fast_join(big_table, small_table):
    return big_table.join(
        F.broadcast(small_table).coalesce(1),
        "PK",
        "left"
    )

In [9]:
#### Create a table form scratch
def create_df():
    my_schema = T.StructType([
            T.StructField("Firstname", T.StringType()),
            T.StructField("Lastname", T.StringType()),
            T.StructField("Age", T.IntegerType()),
            T.StructField("array_of_strings", T.ArrayType(T.StringType())),
            T.StructField("SWIFT_msg", T.StringType()),
        ])

    my_data = [
        {"Firstname": "James", "Lastname": "Bond", "Age": 43, "array_of_strings": ["chat", "chien"], "SWIFT_msg": None},
        {"Firstname": "Pierre", "Lastname": "Bin", "Age": 54, "array_of_strings": ["pizza", "milk", None], "SWIFT_msg": "32A: 1700.45, 52E: CHF"},
        {"Firstname": "Lara", "Lastname": "Tempo", "Age": None, "array_of_strings": ["US", "FR"], "SWIFT_msg": "32A: 1700.45, 52E: CHF, 70: London"}
    ]

    return spark.createDataFrame(my_data, my_schema)


my_df = create_df()
my_df.show()

+---------+--------+----+-------------------+--------------------+
|Firstname|Lastname| Age|   array_of_strings|           SWIFT_msg|
+---------+--------+----+-------------------+--------------------+
|    James|    Bond|  43|      [chat, chien]|                null|
|   Pierre|     Bin|  54|[pizza, milk, null]|32A: 1700.45, 52E...|
|     Lara|   Tempo|null|           [US, FR]|32A: 1700.45, 52E...|
+---------+--------+----+-------------------+--------------------+



In [10]:
#### Parsing

# Let's try to get the amount in two different ways
# The first approach uses "split" and getItem functions
# The second approach uses regex extraction
def parser(table):
    return table.withColumn("Amount_method1", F.split(F.col("SWIFT_MSG"), "32A:").getItem(1))\
                .withColumn("Amount_method1", F.split(F.col("Amount_method1"), ", 52E:").getItem(0))\
                .withColumn("Amount_method2", F.regexp_extract('SWIFT_MSG', r'(32A:\s)([0-9]*\.[0-9]{1,})', 2))


parse_df = parser(my_df)
parse_df.show()

+---------+--------+----+-------------------+--------------------+--------------+--------------+
|Firstname|Lastname| Age|   array_of_strings|           SWIFT_msg|Amount_method1|Amount_method2|
+---------+--------+----+-------------------+--------------------+--------------+--------------+
|    James|    Bond|  43|      [chat, chien]|                null|          null|          null|
|   Pierre|     Bin|  54|[pizza, milk, null]|32A: 1700.45, 52E...|       1700.45|       1700.45|
|     Lara|   Tempo|null|           [US, FR]|32A: 1700.45, 52E...|       1700.45|       1700.45|
+---------+--------+----+-------------------+--------------------+--------------+--------------+



In [None]:
#### Join on levenstein distance condition
def special_cond_join(table1, table2):
    join_cond = (
        (F.levenshtein(table1["Firstname"], table2["prenom"]) < 5)
        &
        (table1["years_old"] == table2["age"])
    )
    return table1.join(table2, join_cond, "inner")

In [None]:
#### String cleaner function using regex
def cleaner(table, list_cols):
    # remove block spaces bigger than 1 and trim
    for c in list_cols:
        table = table.withColumn("cleaned_" + c, F.trim(F.regexp_replace(c, r'(\s{2,})', '')))
    return table

In [11]:
#### Mapping function using SQL like statements
def mapper(table):
    my_mapings = [
        "Firstname as prenom",
        "Lastname as nom_de_famille",
        "Age + 2 as new_age"
    ]
    return table.selectExpr(my_mapings)


map_df = mapper(my_df)
map_df.show()

+------+--------------+-------+
|prenom|nom_de_famille|new_age|
+------+--------------+-------+
| James|          Bond|     45|
|Pierre|           Bin|     56|
|  Lara|         Tempo|   null|
+------+--------------+-------+

