### Partitioning

In [0]:
import getpass
username = getpass.getuser()

In [0]:
orders = spark.read.json('/public/retail_db_json/orders')

***Partition data by date***

In [0]:
from pyspark.sql.functions import date_format

orders.withColumn('order_date', date_format('order_date', 'yyyyMMdd')).show()

+-----------------+----------+--------+---------------+
|order_customer_id|order_date|order_id|   order_status|
+-----------------+----------+--------+---------------+
|            11599|  20130725|       1|         CLOSED|
|              256|  20130725|       2|PENDING_PAYMENT|
|            12111|  20130725|       3|       COMPLETE|
|             8827|  20130725|       4|         CLOSED|
|            11318|  20130725|       5|       COMPLETE|
|             7130|  20130725|       6|       COMPLETE|
|             4530|  20130725|       7|       COMPLETE|
|             2911|  20130725|       8|     PROCESSING|
|             5657|  20130725|       9|PENDING_PAYMENT|
|             5648|  20130725|      10|PENDING_PAYMENT|
|              918|  20130725|      11| PAYMENT_REVIEW|
|             1837|  20130725|      12|         CLOSED|
|             9149|  20130725|      13|PENDING_PAYMENT|
|             9842|  20130725|      14|     PROCESSING|
|             2568|  20130725|      15|       CO

In [0]:
dbutils.fs.rm(f'/user/{username}/retail_db/orders_partitioned_by_date', recurse=True)

Out[4]: False

In [0]:
orders. \
    withColumn('order_date', date_format('order_date', 'yyyyMMdd')). \
    coalesce(1). \
    write. \
    partitionBy('order_date'). \
    parquet(f'/user/{username}/retail_db/orders_partitioned_by_date')

#### User Defined functions

Here are the steps we need to follow to develop and use Spark User Defined Functions.
* Develop required logic using Python as programming language.
* Register the function using `spark.udf.register`. Also assign it to a variable.
* Variable can be used as part of Data Frame APIs such as `select`, `filter`, etc.
* When we register, we register with a name. That name can be used as part of `selectExpr` or as part of Spark SQL queries using `spark.sql`.

In [0]:
df = spark.read.json('/public/retail_db_json/orders')
df.show(2)

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+
only showing top 2 rows



In [0]:
dc = spark.udf.register('date_convert', lambda d: int(d[:10].replace('-', ''))) 
df.select(dc('order_date').alias('order_date')).show(2)

+----------+
|order_date|
+----------+
|  20130725|
|  20130725|
+----------+
only showing top 2 rows



In [0]:
def data_cleanse(c):
    return c.strip() if c.strip() != '\\N' else None

data_cleanse = spark.udf.register('data_cleanse', data_cleanse)

In [0]:
import pandas as pd
from pyspark.sql.functions import * 

courses = {'course_id': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'],
           'course_name': ['Mastering SQL', 'Streaming Pipelines - Python', 'Head First Python',
                           'Designing Data-Intensive Applications', 'Distributed Systems', 'Database Internals',
                           'Art of Immutable Architecture', 'Graph Databases', 'Building MicroServices',
                           'Kubernetes Patterns'],
           'course_author': ['Mike Jack', 'Bob Davis', 'Elvis Presley', 'Martin Kleppmann', 'Sukumar Ghosh',
                             'Alex Petrov',
                             'Michael L. Perry', 'Ian Robinson', 'Sam Newman', 'Rolan Hub'],
           'course_status': ['   published   ', '   inactive   ', '\\N', 'published  ', '\\N', '   inactive',
                             'published   ', '\\N', '  inactive ', 'published   '],
           'course_published_dt': ['2020-07-08', '2020-03-10', '\\N', '2021-02-27', '\\N', '2021-05-14',
                                   '2021-04-18', '\\N',
                                   '2020-12-15', '2021-07-11']}

courses_df = spark.createDataFrame(pd.DataFrame(courses))

In [0]:
courses_df.select(
    data_cleanse(col('course_id')).alias('course_id'),
    data_cleanse(col('course_status').alias('course_status'))
).show(2)

+---------+--------------------------------------------+
|course_id|data_cleanse(course_status AS course_status)|
+---------+--------------------------------------------+
|        1|                                   published|
|        2|                                    inactive|
+---------+--------------------------------------------+
only showing top 2 rows

