In [0]:
#import libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
#create customer dataframe
#data
cust_data = [
    ("Alex", 123457, 20101012, 20122013, "MVD", "Paul", "SA", "USA", "06031987", "A"),
    ("John", 123458, 20101012 ,20121999, "MVD","Paul", "TN", "IND", "06031987", "A"),
    ("Mathew", 123459, 20101012, 20122003, "MVD","Paul", "WAS", "PHIL", "10031998", "A"),
    ("Matt", 12345, 20101012, 20122000, "MVD","Paul", "BOS", "NYC", "01051984", "A"),
    ("Jacob", 125684, 20101012, 20121983, "MVD","Paul", "VIC", "AU", "02081989", "A"),
    ("Chuck", 125679, 20101012, 20121989, "MVD","Paul", "VIC", "AU", "06011999", "A"),
    ("Rosy", 125645, 20101012, 20121987, "MVD","Paul", "VIC", "NYC", "06111987", "A"),
    ("Rosy", 125645, 20101012, 20121990, "MVD","Paul", "VIC", "NYC", "06111987", "A"),
    ("Patrick", 125674, 20101012, 20122006, "MVD","Paul", "AP", "IND", "06071999", "A"),
    ("Murphy", 125623, 20101012, 20122013, "MVD","Paul", "ALP", "PHIL", "06031975", "A"),
    ("Maria", 125698, 20101012, 20122001, "MVD","Paul", "ROK", "USA", "06032001", "A"),
    ("Peter", 125640, 20101012, 20121981, "MVD","Paul", "PAT", "USA", "06031995", "A")
]

#schema
cust_schema = ["Name", "Cust_I", "Open_Dt", "Consul_Dt", "VAC_ID", "DR_Name", "State", "County", "DOB", "FLAG"]

#raw dataframe
cust_df = spark.createDataFrame(data=cust_data, schema=cust_schema)

In [0]:
cust_df.show()

+-------+------+--------+---------+------+-------+-----+------+--------+----+
|   Name|Cust_I| Open_Dt|Consul_Dt|VAC_ID|DR_Name|State|County|     DOB|FLAG|
+-------+------+--------+---------+------+-------+-----+------+--------+----+
|   Alex|123457|20101012| 20122013|   MVD|   Paul|   SA|   USA|06031987|   A|
|   John|123458|20101012| 20121999|   MVD|   Paul|   TN|   IND|06031987|   A|
| Mathew|123459|20101012| 20122003|   MVD|   Paul|  WAS|  PHIL|10031998|   A|
|   Matt| 12345|20101012| 20122000|   MVD|   Paul|  BOS|   NYC|01051984|   A|
|  Jacob|125684|20101012| 20121983|   MVD|   Paul|  VIC|    AU|02081989|   A|
|  Chuck|125679|20101012| 20121989|   MVD|   Paul|  VIC|    AU|06011999|   A|
|   Rosy|125645|20101012| 20121987|   MVD|   Paul|  VIC|   NYC|06111987|   A|
|   Rosy|125645|20101012| 20121990|   MVD|   Paul|  VIC|   NYC|06111987|   A|
|Patrick|125674|20101012| 20122006|   MVD|   Paul|   AP|   IND|06071999|   A|
| Murphy|125623|20101012| 20122013|   MVD|   Paul|  ALP|  PHIL|0

In [0]:
# Change date formats from string to date type
# add new columns : Age & LastConsultDate

cust_with_age_df = cust_df.withColumn("DOB", to_date(col("DOB"), "ddMMyyyy")) \
           .withColumn("Age",(date_diff(current_date(), col("DOB"))/365))\
           .withColumn("Consul_Dt",to_date(col("Consul_Dt"), "ddMMyyyy"))\
           .withColumn("LastConsultDate",(date_diff(current_date(), col("Consul_Dt"))/365))

cust_with_age_df.show()

+-------+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
|   Name|Cust_I| Open_Dt| Consul_Dt|VAC_ID|DR_Name|State|County|       DOB|FLAG|               Age|   LastConsultDate|
+-------+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
|   Alex|123457|20101012|2013-12-20|   MVD|   Paul|   SA|   USA|1987-03-06|   A| 37.66027397260274|10.849315068493151|
|   John|123458|20101012|1999-12-20|   MVD|   Paul|   TN|   IND|1987-03-06|   A| 37.66027397260274| 24.86027397260274|
| Mathew|123459|20101012|2003-12-20|   MVD|   Paul|  WAS|  PHIL|1998-03-10|   A| 26.64109589041096| 20.85753424657534|
|   Matt| 12345|20101012|2000-12-20|   MVD|   Paul|  BOS|   NYC|1984-05-01|   A| 40.50684931506849| 23.85753424657534|
|  Jacob|125684|20101012|1983-12-20|   MVD|   Paul|  VIC|    AU|1989-08-02|   A| 35.24931506849315| 40.87123287671233|
|  Chuck|125679|20101012|1989-12-20|   MVD|   Pa

In [0]:
# filter out customers with LastConsultDate > 30
new_cust_df = cust_with_age_df.filter(col("LastConsultDate")>30)

In [0]:
new_cust_df.show()

+-----+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
| Name|Cust_I| Open_Dt| Consul_Dt|VAC_ID|DR_Name|State|County|       DOB|FLAG|               Age|   LastConsultDate|
+-----+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
|Jacob|125684|20101012|1983-12-20|   MVD|   Paul|  VIC|    AU|1989-08-02|   A| 35.24931506849315| 40.87123287671233|
|Chuck|125679|20101012|1989-12-20|   MVD|   Paul|  VIC|    AU|1999-01-06|   A|25.813698630136987|34.865753424657534|
| Rosy|125645|20101012|1987-12-20|   MVD|   Paul|  VIC|   NYC|1987-11-06|   A|36.989041095890414|36.868493150684934|
| Rosy|125645|20101012|1990-12-20|   MVD|   Paul|  VIC|   NYC|1987-11-06|   A|36.989041095890414|33.865753424657534|
|Peter|125640|20101012|1981-12-20|   MVD|   Paul|  PAT|   USA|1995-03-06|   A|29.654794520547945| 42.87123287671233|
+-----+------+--------+----------+------+-------+-----+------+--

In [0]:
# finalize the distinct eligible customers with recent LastConsultDate

eligible_cust_df = new_cust_df.withColumn("Rank", row_number().over(Window.partitionBy("Cust_I").orderBy(col("LastConsultDate"))))\
                .filter(col("Rank") == 1).drop("Rank")


eligible_cust_df.show()

+-----+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
| Name|Cust_I| Open_Dt| Consul_Dt|VAC_ID|DR_Name|State|County|       DOB|FLAG|               Age|   LastConsultDate|
+-----+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+
|Peter|125640|20101012|1981-12-20|   MVD|   Paul|  PAT|   USA|1995-03-06|   A|29.654794520547945| 42.87123287671233|
| Rosy|125645|20101012|1990-12-20|   MVD|   Paul|  VIC|   NYC|1987-11-06|   A|36.989041095890414|33.865753424657534|
|Chuck|125679|20101012|1989-12-20|   MVD|   Paul|  VIC|    AU|1999-01-06|   A|25.813698630136987|34.865753424657534|
|Jacob|125684|20101012|1983-12-20|   MVD|   Paul|  VIC|    AU|1989-08-02|   A| 35.24931506849315| 40.87123287671233|
+-----+------+--------+----------+------+-------+-----+------+----------+----+------------------+------------------+



In [0]:
# save the data

eligible_cust_df.write.mode("append")\
        .option("path","FileStore/tables/paritionByCountry/")\
        .partitionBy("County")\
        .save()