In [1]:
# Import PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "10g") \
    .appName('Exam') \
    .getOrCreate() 
from pyspark.sql.functions import *

In [2]:
%%html
<style>
.output_subarea.output_text.output_stream.output_stdout > pre {
  width:max-content;
}
.p-Widget.jp-RenderedText.jp-OutputArea-output > pre {
  width:max-content;
}
</style>

In [3]:
%%time
df =  spark.read.parquet('/Users/karla/Documents/Docs/Credit Cards Transactions/tablas_juntasf.parquet')

CPU times: total: 0 ns
Wall time: 2.57 s


In [4]:
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+------+----------+-------------+---------+----------+------+-------+---------+----+-----+-------+--------+---------+-----------------------+-------------------+---------+---------+--------------+---+---------+---------+--------+----------+-------+---+-------+-----------+-----------+------------+------------------+-------------+----+----+----+-----+---+----+------+--------+-------------+-------------+--------------+-------+---+--------+--------+
|Person|CurrentAge|RetirementAge|BirthYear|BirthMonth|Gender|Address|Apartment|City|State|Zipcode|Latitude|Longitude|PerCapitaIncome-Zipcode|YearlyIncome-Person|TotalDebt|FICOScore|NumCreditCards| id|CARDINDEX|CardBrand|CardType|CardNumber|Expires|CVV|HasChip|CardsIssued|CreditLimit|AcctOpenDate|YearPINlastChanged|CardonDarkWeb|User|Card|Year|Month|Day|Time|Amount|Use_Chip|Merchant_Name|Merchant_City|Merchant_State|    Zip|MCC|  Errors|Is_Fraud|
+------+----------+-------------+---------+----------+------+-------+---------+----+-----+-------+--

In [5]:
df = df.drop('MCC','Errors', 'Apartment')

# Limpieza e Ingeniería

In [6]:
us_states = ["AK","AL", "AR",
             "AZ","CA","CO",
             "CT", "DC","DE",
             "FL","GA","HI",
             "IA","ID","IL",
             "IN", "KS","KY",
             "LA","MA","MD",
             "ME","MI","MN",
             "MO","MS","MT",
             "NC","ND","NE",
             "NH","NJ","NM",
             "NV","NY","OH",
             "OK","OR","PA",
             "RI","SC","SD",
             "TN","TX","UT",
             "VA","VT","WA",
             "WI","WV","WY"]

In [7]:
from pyspark.sql.types import FloatType, DecimalType
df = df.select('Person',
                 'CurrentAge',
                 'RetirementAge',
                 'BirthYear',
                 'BirthMonth',
                 'Gender',
                 'Address',
                 'City',
                 'State',
                 'Zipcode',
                 'Latitude',
                 'Longitude',
                  regexp_replace(col("PerCapitaIncome-Zipcode"),
                                 "[^\d.]", "").cast(FloatType()).alias("PerCapitaIncome_Zipcode"),
                  regexp_replace(col("YearlyIncome-Person"),
                                 "[^\d.]", "").cast(FloatType()).alias("YearlyIncome_Person"),
                  regexp_replace(col("TotalDebt"),
                                 "[^\d.]", "").cast(FloatType()).alias("TotalDebt"),
                 'FICOScore',
                 'NumCreditCards',
                 'id',
                 'CARDINDEX',
                 'CardBrand',
                 'CardType',
                 'CardNumber',
                 'Expires',
                 'CVV',
                 'HasChip',
                 'CardsIssued',
                  regexp_replace(col("CreditLimit"), "[^\d.]", "").cast(FloatType()).alias("CreditLimit"),
                 'AcctOpenDate',
                 'YearPINlastChanged',
                 'CardonDarkWeb',
                 'User',
                 'Card',
                 'Year',
                 'Month',
                 'Day',
                 'Time',
                  regexp_replace(col("Amount"), "[^\d.]", "").cast(FloatType()).alias("Amount"),
                 'Use_Chip',
                 'Merchant_Name',
                 'Merchant_City',
                 when(col("Merchant_State").isNull(),
                      "ONLINE").otherwise(col("Merchant_State")).alias("Merchant_State"),
                 when(col("Zip").isNull(), "ONLINE").otherwise(col("Zip")).alias("Zip"),
                 'Is_Fraud')

In [8]:
df = df.select("*",
               when(col("Merchant_State").isin(us_states),
                    "US").when(col("Merchant_State").isin("ONLINE"),
                               "ONLINE").otherwise("ABROAD").alias("TransType"))

In [9]:
df = df.select("*",
                    when(col("TransType") == 'US', col("Amount")).otherwise(0).alias('USAmount'),
                    when(col("TransType") == 'ONLINE', col("Amount")).otherwise(0).alias('OnlineAmount'),
                    when(col("TransType") == 'ABROAD', col("Amount")).otherwise(0).alias('AbroadAmount'),)

In [10]:
df.show()

+------------+----------+-------------+---------+----------+------+---------------+----------+-----+-------+--------+---------+-----------------------+-------------------+---------+---------+--------------+---+---------+----------+--------+----------------+-------+---+-------+-----------+-----------+------------+------------------+-------------+----+----+----+-----+---+-----+------+------------------+--------------------+-------------+--------------+------+--------+---------+--------+------------+------------+
|      Person|CurrentAge|RetirementAge|BirthYear|BirthMonth|Gender|        Address|      City|State|Zipcode|Latitude|Longitude|PerCapitaIncome_Zipcode|YearlyIncome_Person|TotalDebt|FICOScore|NumCreditCards| id|CARDINDEX| CardBrand|CardType|      CardNumber|Expires|CVV|HasChip|CardsIssued|CreditLimit|AcctOpenDate|YearPINlastChanged|CardonDarkWeb|User|Card|Year|Month|Day| Time|Amount|          Use_Chip|       Merchant_Name|Merchant_City|Merchant_State|   Zip|Is_Fraud|TransType|USAm

# TAD

In [11]:
f = df.groupBy(['Person', 'CurrentAge','FICOScore', 'Zipcode',
                'YearlyIncome_Person', 'TotalDebt', 'NumCreditCards'])\
      .agg((sum("Amount").cast(DecimalType(10,2))).alias("TotalAmount"),
           (avg("Amount").cast(DecimalType(10,2))).alias("AvgAmount"),
           (max("Amount").cast(DecimalType(10,2))).alias("MaxAmount"),
           (min("Amount").cast(DecimalType(10,2))).alias("MinAmount"),
           count("id").alias("TotalTransactions"),
           count(when(col('TransType') == 'US', True)).alias('USTrans'),
           count(when(col('TransType') == 'ONLINE', True)).alias('OnlineTrans'),
           count(when(col('TransType') == 'ABROAD', True)).alias('AbroadTrans'),
           (avg("USAmount").cast(DecimalType(10,2))).alias("AvgUSAmount"),
           (avg("OnlineAmount").cast(DecimalType(10,2))).alias("AvgOnlineAmount"),
           (avg("AbroadAmount").cast(DecimalType(10,2))).alias("AvgAbroadAmount"),
          )

In [12]:
f.show()

+----------------+----------+---------+-------+-------------------+---------+--------------+-----------+---------+---------+---------+-----------------+-------+-----------+-----------+-----------+---------------+---------------+
|          Person|CurrentAge|FICOScore|Zipcode|YearlyIncome_Person|TotalDebt|NumCreditCards|TotalAmount|AvgAmount|MaxAmount|MinAmount|TotalTransactions|USTrans|OnlineTrans|AbroadTrans|AvgUSAmount|AvgOnlineAmount|AvgAbroadAmount|
+----------------+----------+---------+-------+-------------------+---------+--------------+-----------+---------+---------+---------+-----------------+-------+-----------+-----------+-----------+---------------+---------------+
|     Reign Scott|        30|      640|  45245|            51998.0|  73575.0|             1|  192817.50|    82.58|  1286.38|     0.53|             2335|   2174|        141|         20|      68.40|          12.42|           1.76|
|     Raina Anwar|        40|      716|  74855|            36152.0|  81345.0|       

In [13]:
f.count()

2000

# Clientes repetidos

In [14]:
f.groupBy("Person").count().orderBy(col("count").desc()).show()

+----------------+-----+
|          Person|count|
+----------------+-----+
|  Lochlan Morris|    2|
|     Rory Nelson|    2|
|  Hazel Robinson|    2|
|Magdalena Farhad|    2|
|   Abril Labelle|    2|
| Casey El-Mafouk|    2|
|   Cassidy Anwar|    2|
|  Cristian Adams|    1|
|   Belen Stewart|    1|
|     Cali Brooks|    1|
|Colette Phillips|    1|
|     Amir Leroux|    1|
|    Alia Stewart|    1|
|  Harrison Green|    1|
|   Ingrid Thomas|    1|
| Markus Masvidal|    1|
|      Addyson Xi|    1|
| Savannah Nguyen|    1|
|      Blake Sadr|    1|
|   Vivian Rogers|    1|
+----------------+-----+
only showing top 20 rows



In [15]:
f.filter(col("Person") == "Magdalena Farhad").show()

+----------------+----------+---------+-------+-------------------+---------+--------------+-----------+---------+---------+---------+-----------------+-------+-----------+-----------+-----------+---------------+---------------+
|          Person|CurrentAge|FICOScore|Zipcode|YearlyIncome_Person|TotalDebt|NumCreditCards|TotalAmount|AvgAmount|MaxAmount|MinAmount|TotalTransactions|USTrans|OnlineTrans|AbroadTrans|AvgUSAmount|AvgOnlineAmount|AvgAbroadAmount|
+----------------+----------+---------+-------+-------------------+---------+--------------+-----------+---------+---------+---------+-----------------+-------+-----------+-----------+-----------+---------------+---------------+
|Magdalena Farhad|        45|      663|  34232|            38872.0|  94970.0|             4| 1239062.40|    74.25|   874.66|     0.09|            16687|  15796|        862|         29|      68.14|           5.79|           0.32|
|Magdalena Farhad|        36|      719|  88045|            31414.0|  42233.0|       

Notemos que tenemos diferente edad y zipcode, por lo tanto nos quedaremos con la edad más actual.

In [16]:
f = f.orderBy(col('CurrentAge').desc()).coalesce(1).dropDuplicates(subset = ['Person'])

In [17]:
f.count()

1993

# Guardamos

In [18]:
f.write.format("csv").save("/Users/karla/Documents/Docs/Credit Cards Transactions/Exam_TAD.csv")