## User Data Transformation

In [2]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark =SparkSession.builder.appName("Name").getOrCreate()

In [4]:
user_df = spark.read.format("csv").options(inferSchema="true",header="true")\
.load("/C:/Users/vedant.mahale/Pyspark/Transaction_project/users_data.csv")

In [5]:
user_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: string (nullable = true)
 |-- yearly_income: string (nullable = true)
 |-- total_debt: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)



#### Data Type Conversion:

In [7]:
#Convert per_capita_income, yearly_income, and total_debt fields to numeric formats (e.g., DoubleType) to enable calculations.

user_df= user_df.withColumn("per_capita_income",regexp_replace("per_capita_income","[$]",''))\
                    .withColumn("yearly_income",regexp_replace("yearly_income","[$]",''))\
                        .withColumn("total_debt",regexp_replace("total_debt","[$]",''))

user_df= user_df.withColumn("per_capita_income",user_df["per_capita_income"].cast(DoubleType()))\
                    .withColumn("yearly_income",user_df["yearly_income"].cast('double'))\
                        .withColumn("total_debt",user_df["total_debt"].cast('double'))


#### Date Calculations:

In [9]:
#Derive age from birth_year and birth_month to ensure consistency with current_age.
#Calculate years_until_retirement as retirement_age - current_age.

#user_df = user_df.withColumn("birth_year_month",concate(col("birth_year"),"-",col("birth_month")))

user_df = user_df.withColumn("birth_date", expr("birth_year || '-' || birth_month"))\
                    .withColumn("birth_date",to_date("birth_date"))

user_df = user_df.withColumn("current_date",current_date())
#two new columns added with name "current_date" and "birth_date"


In [10]:
#now creating the column name "current_new_age" and deleting the Old "current_age" Column

user_df = user_df.withColumn("current_new_age",(months_between(col("current_date"),col("birth_date"))/12).cast("int"))

user_df = user_df.drop("current_age")

#### Income Aggregation:

In [12]:
#Create a debt_to_income_ratio by dividing total_debt by yearly_income.

user_df= user_df.withColumn("dept_to_income_ration",round(user_df["total_debt"]/user_df["yearly_income"],2))

#### Geospatial Data Verification:

In [14]:
user_df.filter(user_df.latitude.isNull()).count()

0

In [15]:
#Verify that latitude and longitude fall within valid ranges; handle invalid entries by filtering or imputing values.
#The range for latitude is -90° to 90°, and the range for longitude is -180° to 180°

user_df.filter(user_df.latitude.isNull()).count()
user_df.filter(user_df.longitude.isNull()).count()

user_df = user_df.withColumn("latitude",when((col("latitude") > -90) & (col("latitude") < 90), col("latitude")).otherwise(None))\
                    .withColumn("latitude",when((col("latitude") > -90) & (col("latitude") < 90), col("latitude")).otherwise(None))


In [16]:
user_df.filter(user_df.latitude.isNull()).count()


0

In [17]:
user_df.filter(user_df.longitude.isNull()).count()

0

#### Category Standardization:

In [19]:
#Standardize gender values (e.g., "M", "F", "Other") and handle inconsistent or missing values
user_df = user_df.withColumn(
    "gender",
    when(col("gender") == "Male", "M")
    .when(col("gender") == "Female", "F")
)

#### Credit Score Categorization:

In [21]:
#Segment credit_score into categories (e.g., “Poor,” “Fair,” “Good”) to facilitate analysis.
#For a score with a range between 300 and 850, a credit score of 670 to 739 is considered good. 
#Credit scores above 740 are very good and above 800 are excellent.


user_df = user_df.withColumn("credit_score_indicator",when((col("credit_score").between('670',"739")),"Fair")
                             .when((col("credit_score") >= 740) & (col("credit_score") < 8000),"Good" )
                             .when((col("credit_score") > 800),"Excellent").otherwise("Poor"))

user_df.show(2)


+----+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+----------+------------+---------------+---------------------+----------------------+
|  id|retirement_age|birth_year|birth_month|gender|             address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|birth_date|current_date|current_new_age|dept_to_income_ration|credit_score_indicator|
+----+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+----------+------------+---------------+---------------------+----------------------+
| 825|            66|      1966|         11|     F|       462 Rose Lane|   34.15|  -117.76|          29278.0|      59696.0|  127613.0|         787|               5|1966-11-01|  2024-11-28|             58|                 2.14|                  Good|


In [22]:
user_df= user_df.withColumnRenamed("id","user_id")

## Card Data transformation

#### Data Type Conversion:


In [25]:
card_data_df = spark.read.format("csv").options(inferSchema="true",header="true")\
.load("/C:/Users/vedant.mahale/Pyspark/Transaction_project/cards_data.csv")

In [26]:
card_data_df= card_data_df.withColumn("expires",regexp_replace("expires","[/]",'-'))\
                    .withColumn("acct_open_date",regexp_replace("acct_open_date","[/]",'-'))

card_data_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: string (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- acct_open_date: string (nullable = true)
 |-- year_pin_last_changed: integer (nullable = true)
 |-- card_on_dark_web: string (nullable = true)



In [27]:
#Convert credit_limit to a numeric type (e.g., DoubleType) for numerical analysis.
#Ensure acct_open_date and expires are in DateType for easier date calculations.
card_data_df = card_data_df.withColumn("expires",to_date("expires","MM-yyyy"))\
                                .withColumn("acct_open_date",to_date("acct_open_date","MM-yyyy"))\
                                    .withColumn("year_pin_last_changed",col("year_pin_last_changed").cast("string"))\
                                        .withColumn("year_pin_last_changed",to_date("year_pin_last_changed"))\
                                        .withColumn("credit_limit",regexp_replace("credit_limit","[$]",'').cast("double"))
                                        


In [28]:
## Date Calculations:

In [29]:
#Derive the account_age in years from acct_open_date and current date.

card_data_df = card_data_df.withColumn("current_date",current_date())

card_data_df = card_data_df.withColumn("account_age",(months_between(col("current_date"),col("acct_open_date"))/12).cast("int"))


In [30]:
#Calculate years_since_pin_change using year_pin_last_changed.

card_data_df = card_data_df.withColumn("years_since_pin_change",(months_between(col("current_date"),col("year_pin_last_changed"))/12).cast("int"))


#### Standardize Columns for Joining:


In [32]:
#Ensure client_id and id have consistent data types and formats across tables for joins.
card_data_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: date (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: double (nullable = true)
 |-- acct_open_date: date (nullable = true)
 |-- year_pin_last_changed: date (nullable = true)
 |-- card_on_dark_web: string (nullable = true)
 |-- current_date: date (nullable = false)
 |-- account_age: integer (nullable = true)
 |-- years_since_pin_change: integer (nullable = true)



#### Categorize Card Details:

In [34]:
#the average credit limit for each combination of card type and card brand?

grouped_df= card_data_df.groupBy(col("card_type"),col("card_brand")).agg(round(avg("credit_limit"),2).alias("Average"))
grouped_df.show()


+---------------+----------+--------+
|      card_type|card_brand| Average|
+---------------+----------+--------+
|Debit (Prepaid)|Mastercard|   64.78|
|         Credit|      Visa|11295.56|
|         Credit|  Discover|10816.27|
|          Debit|      Visa|19019.62|
|Debit (Prepaid)|      Visa|    63.8|
|         Credit|Mastercard|10971.65|
|          Debit|Mastercard|18279.71|
|         Credit|      Amex|11436.32|
+---------------+----------+--------+



In [35]:
#Categorize card_type (e.g., "Credit," "Debit") and card_brand (e.g., "Visa," "Mastercard") into predefined groups.

from pyspark.sql import functions as F

card_data_df = card_data_df.withColumn(
    "card_type_category",
    F.when(F.col("card_type") == "Credit", "Credit Card")
    .when(F.col("card_type").like("Debit%"), "Debit Card")
    .otherwise("Other")
).withColumn(
    "card_brand_category",
    F.when(F.col("card_brand") == "Visa", "Visa")
    .when(F.col("card_brand") == "Mastercard", "Mastercard")
    .when(F.col("card_brand")== "Discover","Discover")
    .when(F.col("card_brand")== "Amex","Amex")
    .otherwise("Other Brand")
)


card_data_df.select("card_type", "card_type_category", "card_brand", "card_brand_category").show()

+---------------+------------------+----------+-------------------+
|      card_type|card_type_category|card_brand|card_brand_category|
+---------------+------------------+----------+-------------------+
|          Debit|        Debit Card|      Visa|               Visa|
|          Debit|        Debit Card|      Visa|               Visa|
|          Debit|        Debit Card|      Visa|               Visa|
|         Credit|       Credit Card|      Visa|               Visa|
|Debit (Prepaid)|        Debit Card|Mastercard|         Mastercard|
|         Credit|       Credit Card|      Visa|               Visa|
|          Debit|        Debit Card|      Visa|               Visa|
|          Debit|        Debit Card|Mastercard|         Mastercard|
|Debit (Prepaid)|        Debit Card|Mastercard|         Mastercard|
|Debit (Prepaid)|        Debit Card|Mastercard|         Mastercard|
|          Debit|        Debit Card|Mastercard|         Mastercard|
|          Debit|        Debit Card|Mastercard| 

#### Boolean Conversion:

In [37]:
#Convert has_chip and card_on_dark_web to booleans (e.g., True/False) for consistency.

card_data_df = card_data_df.withColumn("has_chip",col("has_chip").cast("boolean"))\
                            .withColumn("card_on_dark_web",col("card_on_dark_web").cast("boolean"))

card_data_df.select("has_chip","card_on_dark_web").show(5)

+--------+----------------+
|has_chip|card_on_dark_web|
+--------+----------------+
|    true|           false|
|    true|           false|
|    true|           false|
|   false|           false|
|    true|           false|
+--------+----------------+
only showing top 5 rows



In [38]:
card_data_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: date (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: boolean (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: double (nullable = true)
 |-- acct_open_date: date (nullable = true)
 |-- year_pin_last_changed: date (nullable = true)
 |-- card_on_dark_web: boolean (nullable = true)
 |-- current_date: date (nullable = false)
 |-- account_age: integer (nullable = true)
 |-- years_since_pin_change: integer (nullable = true)
 |-- card_type_category: string (nullable = false)
 |-- card_brand_category: string (nullable = false)



#### Validation Of card Number

In [40]:
#Length of the card_number must be 16 for valid card number 
#show the card number which has length less than or more than 16 

length_df  = card_data_df.withColumn("length",length("card_number"))
valid_card = length_df.withColumn("length_validation",when(col("length") == "16", "valid").otherwise("invalid"))

In [41]:
valid_card.count()


#valid_df = card_data_df.withColumn("card_number",)

6146

In [42]:
from pyspark.sql import functions as F

# Calculate the length of the card_number column
length_df = card_data_df.withColumn("length", F.length(F.col("card_number"))).where(col('length').isin('15'))



In [43]:
length_df.select('length').distinct().show(100,False)

+------+
|length|
+------+
|15    |
+------+



In [44]:
length_df.count()

402

In [45]:
card_data_df.withColumn("length",length("card_number")).show()

+----+---------+----------+---------------+----------------+----------+---+--------+----------------+------------+--------------+---------------------+----------------+------------+-----------+----------------------+------------------+-------------------+------+
|  id|client_id|card_brand|      card_type|     card_number|   expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|current_date|account_age|years_since_pin_change|card_type_category|card_brand_category|length|
+----+---------+----------+---------------+----------------+----------+---+--------+----------------+------------+--------------+---------------------+----------------+------------+-----------+----------------------+------------------+-------------------+------+
|4524|      825|      Visa|          Debit|4344676511950444|2022-12-01|623|    true|               2|     24295.0|    2002-09-01|           2008-01-01|           false|  2024-11-28|         22|                  

In [46]:
card_data_df = card_data_df.withColumnRenamed("id","card_id")

## Transaction Data transformation

In [48]:
transaction_df = spark.read.format("csv").options(inferSchema="true",header="true")\
.load("/C:/Users/vedant.mahale/Pyspark/Transaction_project/transactions_data.csv")

#### Data Type Conversion:


In [50]:
transaction_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)



In [51]:
#Convert amount to DoubleType for numerical operations.
#Convert use_chip and errors to a standardized format (e.g., "Yes/No" or Boolean).
transaction_df = transaction_df.withColumn("amount",regexp_replace("amount","[$,-]",'').cast("double"))\
                                    .withColumn("zip",col("zip").cast("integer"))\
                                        .withColumn("use_card",when(col("use_chip") == "Swipe Transaction","true").otherwise("false").cast("boolean"))\
                                            .withColumn("use_online",when(col("use_chip") == "Online Transaction","true").otherwise("false").cast("boolean"))


#### Date and Time Features:


In [53]:
#Extract date components (day, month, year) from date for time-based analysis.
transaction_df = transaction_df.withColumn("year",year("date"))\
                                .withColumn("month",month("date"))\
                                    .withColumn("day",day("date"))


#Calculate hour_of_day to understand transaction timing patterns.
#Calculate transaction_date to understand transaction date patterns.

transaction_df = transaction_df.withColumn("hour_of_day",date_format("date", "HH:mm:ss"))\
                                .withColumn("transaction_date", to_date(col("date")))


#### Standardize Location Data:

In [55]:
#Ensure consistent formatting for merchant_city, merchant_state, and zip (e.g., uppercase city/state, string format for zip).


transaction_df = transaction_df.withColumn("zip",col("zip").cast("string"))\
                                .withColumn("merchant_city",upper(col("merchant_city")))\
                                .withColumn("merchant_state",upper(col("merchant_state")))


#### Merchant Category Code (MCC) Grouping:


In [57]:
#Map mcc to categories like "Retail," "Travel," or "Food" to facilitate BI analysis.

transaction_df.select("mcc").distinct().count()

transaction_df = transaction_df.withColumn(
    "category",
    when(col("mcc").between(5400, 5499), "Retail") \
    .when(col("mcc").between(5812, 5814), "Food") \
    .when(col("mcc").between(3000, 3299), "Travel") \
    .when(col("mcc").between(4814, 4899), "Telecom") \
    .otherwise("Other")
)

#### Error Handling:

In [59]:
#Standardize values in the errors field for easier analysis and reporting.

transaction_df = transaction_df.withColumn(
    "errors",
    when(col("errors").isNull(), "No Error") \
    .when(col("errors").isin("NULL", "N/A", "NA", "null"), "No Error") \
    .when(col("errors").like("%timeout%"), "Timeout Error") \
    .when(col("errors").like("%decline%"), "Transaction Declined") \
    .when(col("errors").like("%invalid%"), "Invalid Entry") \
    .otherwise(col("errors"))
)

In [60]:
transaction_df.show(10)

+-------+-------------------+---------+-------+------+------------------+-----------+-------------+--------------+-----+----+--------+--------+----------+----+-----+---+-----------+----------------+--------+
|     id|               date|client_id|card_id|amount|          use_chip|merchant_id|merchant_city|merchant_state|  zip| mcc|  errors|use_card|use_online|year|month|day|hour_of_day|transaction_date|category|
+-------+-------------------+---------+-------+------+------------------+-----------+-------------+--------------+-----+----+--------+--------+----------+----+-----+---+-----------+----------------+--------+
|7475327|2010-01-01 00:01:00|     1556|   2972|  77.0| Swipe Transaction|      59935|       BEULAH|            ND|58523|5499|No Error|    true|     false|2010|    1|  1|   00:01:00|      2010-01-01|  Retail|
|7475328|2010-01-01 00:02:00|      561|   4575| 14.57| Swipe Transaction|      67570|   BETTENDORF|            IA|52722|5311|No Error|    true|     false|2010|    1|  1

In [61]:
transaction_df = transaction_df.withColumnRenamed("id","Transaction_id")
                                            

## Final Data Joins

In [63]:
#user_df.show(10)

In [64]:
#transaction_df.show(10)

In [65]:
#card_data_df.show(10)

In [66]:
new_df= user_df.join(card_data_df,user_df.user_id == card_data_df.client_id,"inner")

In [67]:
new2_df = new_df.join(transaction_df,new_df.client_id == transaction_df.client_id,"inner")

In [68]:
nov_df = new2_df

In [69]:
nov_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- current_date: date (nullable = false)
 |-- current_new_age: integer (nullable = true)
 |-- dept_to_income_ration: double (nullable = true)
 |-- credit_score_indicator: string (nullable = false)
 |-- card_id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nulla

## Extra Transformations 

In [71]:
nov_df.columns

['user_id',
 'retirement_age',
 'birth_year',
 'birth_month',
 'gender',
 'address',
 'latitude',
 'longitude',
 'per_capita_income',
 'yearly_income',
 'total_debt',
 'credit_score',
 'num_credit_cards',
 'birth_date',
 'current_date',
 'current_new_age',
 'dept_to_income_ration',
 'credit_score_indicator',
 'card_id',
 'client_id',
 'card_brand',
 'card_type',
 'card_number',
 'expires',
 'cvv',
 'has_chip',
 'num_cards_issued',
 'credit_limit',
 'acct_open_date',
 'year_pin_last_changed',
 'card_on_dark_web',
 'current_date',
 'account_age',
 'years_since_pin_change',
 'card_type_category',
 'card_brand_category',
 'Transaction_id',
 'date',
 'client_id',
 'card_id',
 'amount',
 'use_chip',
 'merchant_id',
 'merchant_city',
 'merchant_state',
 'zip',
 'mcc',
 'errors',
 'use_card',
 'use_online',
 'year',
 'month',
 'day',
 'hour_of_day',
 'transaction_date',
 'category']

In [72]:
#Client with Highest Transaction in Each Year

df = nov_df.select("user_id","amount","year")
df = df.groupBy("user_id","year").agg(round(sum("amount"),2).alias("Highest_transaction_amount"))
#df.orderBy(col("year").asc(),col("Highest_transaction_amount").desc()).show(10)

window= Window.partitionBy("year").orderBy(col("Highest_transaction_amount").desc())
df2 = df.withColumn("Rank",row_number().over(window))
df2.filter(col("Rank")==1).show(10)



+-------+----+--------------------------+----+
|user_id|year|Highest_transaction_amount|Rank|
+-------+----+--------------------------+----+
|     96|2010|                1508300.36|   1|
|    989|2011|                1503995.36|   1|
|     96|2012|                1391703.68|   1|
|     96|2013|                1449674.72|   1|
|     96|2014|                 1425183.2|   1|
|     96|2015|                 1481565.2|   1|
|    989|2016|                1539328.48|   1|
|     96|2017|                1466096.76|   1|
|     96|2018|                1479878.48|   1|
|    989|2019|                1190375.52|   1|
+-------+----+--------------------------+----+



In [73]:
#Particular Month Account Openings 

df = nov_df.withColumn("month_year",substring(col("acct_open_date"),1,7)) #.select("month_year")
df = df.groupBy("month_year").agg(count("user_id").alias("total_acc_open")).orderBy(col("month_year").desc())
df.filter(col("month_year")=="2019-10").show()



+----------+--------------+
|month_year|total_acc_open|
+----------+--------------+
|   2019-10|         72023|
+----------+--------------+



In [74]:
#Monthly Transaction Count
montly_df = nov_df.withColumn("month_year",substring(col("date"),1,7))
montly_df.groupBy("month_year").agg(count(col("Transaction_id")).alias("Count_id")).orderBy(col("month_year").desc()).show()


+----------+--------+
|month_year|Count_id|
+----------+--------+
|   2019-10|  450229|
|   2019-09|  439462|
|   2019-08|  455248|
|   2019-07|  455871|
|   2019-06|  440146|
|   2019-05|  452169|
|   2019-04|  436867|
|   2019-03|  453093|
|   2019-02|  407520|
|   2019-01|  453283|
|   2018-12|  453382|
|   2018-11|  439771|
|   2018-10|  451221|
|   2018-09|  439704|
|   2018-08|  455483|
|   2018-07|  454649|
|   2018-06|  439727|
|   2018-05|  452438|
|   2018-04|  439674|
|   2018-03|  454486|
+----------+--------+
only showing top 20 rows



In [75]:
#daily Transaction Count 
daily_df = nov_df.groupBy("transaction_date").agg(count(col("Transaction_id")).alias("Count_id")).orderBy(col("transaction_date").desc())
daily_df.show()

+----------------+--------+
|transaction_date|Count_id|
+----------------+--------+
|      2019-10-31|   15247|
|      2019-10-30|   13829|
|      2019-10-29|   13698|
|      2019-10-28|   15099|
|      2019-10-27|   14897|
|      2019-10-26|   14591|
|      2019-10-25|   14773|
|      2019-10-24|   14810|
|      2019-10-23|   14118|
|      2019-10-22|   13270|
|      2019-10-21|   15581|
|      2019-10-20|   14683|
|      2019-10-19|   15092|
|      2019-10-18|   15213|
|      2019-10-17|   15042|
|      2019-10-16|   13479|
|      2019-10-15|   13343|
|      2019-10-14|   14906|
|      2019-10-13|   14999|
|      2019-10-12|   14867|
+----------------+--------+
only showing top 20 rows



In [76]:
#transaction Count Between Days 
daily_df.filter(col("transaction_date").between("2019-10-12","2019-10-25")).show()

+----------------+--------+
|transaction_date|Count_id|
+----------------+--------+
|      2019-10-25|   14773|
|      2019-10-24|   14810|
|      2019-10-23|   14118|
|      2019-10-22|   13270|
|      2019-10-21|   15581|
|      2019-10-20|   14683|
|      2019-10-19|   15092|
|      2019-10-18|   15213|
|      2019-10-17|   15042|
|      2019-10-16|   13479|
|      2019-10-15|   13343|
|      2019-10-14|   14906|
|      2019-10-13|   14999|
|      2019-10-12|   14867|
+----------------+--------+



In [77]:
#Merchant Id Wise Revenue

nov_df.groupBy("merchant_id","year").agg(round(sum("amount"),2).alias("Total_revenue")).orderBy(col("merchant_id")).show()

+-----------+----+-------------+
|merchant_id|year|Total_revenue|
+-----------+----+-------------+
|          1|2012|      1013.18|
|          2|2010|       204.96|
|          2|2011|       223.36|
|          2|2019|       376.67|
|          3|2012|        85.08|
|          5|2012|      1313.12|
|          5|2011|         98.1|
|          9|2012|      2774.17|
|          9|2010|       121.95|
|          9|2015|       133.17|
|          9|2016|       196.17|
|          9|2017|       127.43|
|          9|2018|       240.35|
|         13|2013|         24.5|
|         13|2018|        124.8|
|         14|2011|        29.12|
|         15|2011|      1500.56|
|         15|2012|      1485.04|
|         15|2010|       429.32|
|         15|2015|      1821.96|
+-----------+----+-------------+
only showing top 20 rows



In [78]:
#Categories wise revenue
nov_df.groupBy("category","year").agg(round(sum("amount"),2).alias("Total_revenue")).orderBy(col("year")).show()

+--------+----+--------------+
|category|year| Total_revenue|
+--------+----+--------------+
|  Travel|2010|    7423365.02|
|   Other|2010|1.6893800475E8|
|    Food|2010| 1.689452057E7|
| Telecom|2010| 3.051385605E7|
|  Retail|2010| 3.637477308E7|
|  Travel|2011|    7145030.15|
| Telecom|2011|  3.15155385E7|
|   Other|2011|1.7112061716E8|
|  Retail|2011| 3.728629274E7|
|    Food|2011| 1.738275669E7|
|  Retail|2012| 3.783195203E7|
|  Travel|2012|    7132030.59|
|   Other|2012|1.7400034132E8|
|    Food|2012| 1.789362537E7|
| Telecom|2012| 3.168730334E7|
| Telecom|2013| 3.208716206E7|
|  Travel|2013|    7257157.46|
|    Food|2013| 1.808092338E7|
|   Other|2013|1.7790051201E8|
|  Retail|2013| 3.867796867E7|
+--------+----+--------------+
only showing top 20 rows



In [79]:
#transaction More than 1.037786502E7 Amount Customers ID 

df = nov_df.groupBy("user_id").agg(round(sum("amount"),2).alias("Highest_transaction")).orderBy(col("Highest_transaction").desc())
df.filter(col("Highest_transaction") > 1.037786502E7).show()

+-------+-------------------+
|user_id|Highest_transaction|
+-------+-------------------+
|     96|       1.42338442E7|
|    989|      1.307669176E7|
|    840|      1.261612116E7|
|    909|       1.23498848E7|
|   1098|      1.215156904E7|
|   1340|      1.084565615E7|
|   1727|      1.072271472E7|
|   1811|        1.0407798E7|
+-------+-------------------+



In [80]:
new2_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- current_date: date (nullable = false)
 |-- current_new_age: integer (nullable = true)
 |-- dept_to_income_ration: double (nullable = true)
 |-- credit_score_indicator: string (nullable = false)
 |-- card_id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nulla

In [81]:
new2_df=new2_df.drop("current_date")
new2_df=new2_df.drop("client_id")
new2_df = df.withColumn("current_date",date_format(current_date(), "yyyy-MM-dd"))

In [82]:
nov_df

DataFrame[user_id: int, retirement_age: int, birth_year: int, birth_month: int, gender: string, address: string, latitude: double, longitude: double, per_capita_income: double, yearly_income: double, total_debt: double, credit_score: int, num_credit_cards: int, birth_date: date, current_date: date, current_new_age: int, dept_to_income_ration: double, credit_score_indicator: string, card_id: int, client_id: int, card_brand: string, card_type: string, card_number: bigint, expires: date, cvv: int, has_chip: boolean, num_cards_issued: int, credit_limit: double, acct_open_date: date, year_pin_last_changed: date, card_on_dark_web: boolean, current_date: date, account_age: int, years_since_pin_change: int, card_type_category: string, card_brand_category: string, Transaction_id: int, date: timestamp, client_id: int, card_id: int, amount: double, use_chip: string, merchant_id: int, merchant_city: string, merchant_state: string, zip: string, mcc: int, errors: string, use_card: boolean, use_onlin

In [None]:
nov_df.show(10)

In [None]:
from pyspark.sql import functions as f

In [None]:

#range0 = f.beetween("2001-01-01", "2001-31-01")

range0 = (col("transaction_date")>"2010-01-01") & (col("transaction_date")<"2010-12-31")
range1 = (col("transaction_date")>"2011-01-01") & (col("transaction_date")<"2011-12-31")
range3 = (col("transaction_date")== "2012-01-01")

data1 = nov_df.withColumn("Range_bucket",when(range0,">2010-01-01 to <2010-12-31").when(range1,">2011-01-01 to <2011-12-31").when(range3,"==2012-01-01"))

## Exporting the Final Data For BI Team 

In [None]:
nov_df_2 = nov_df_1.withColumn("current_date",date_format(current_date(), "yyyy-MM-dd"))

In [None]:
nov_df_2

In [None]:
#nov_df_2.write.csv("C:/Users/vedant.mahale/Downloads/final_data.csv",header=True)
#nov_df_2.coalesce(1).write.parquet("/user/spark/vedant/transaction_project/")
