In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark
spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [3]:
cases = spark.read.csv('C:\\Users\\admin\\Desktop\\spark-data\\case.csv',inferSchema=True,header=True)

In [4]:
cases.count()

174

In [5]:
cases.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [6]:
cases = cases.withColumnRenamed('infection_case','infection_source')

In [7]:
cases = cases.select('province','city','infection_source','confirmed')

In [8]:
cases.sort('confirmed').show()

+-----------------+---------------+--------------------+---------+
|         province|           city|    infection_source|confirmed|
+-----------------+---------------+--------------------+---------+
|          Jeju-do|              -|contact with patient|        0|
|       Gangwon-do|              -|contact with patient|        0|
|            Seoul|     Gangseo-gu|SJ Investment Cal...|        0|
|            Busan|from other city|Cheongdo Daenam H...|        1|
|     Jeollabuk-do|from other city|  Shincheonji Church|        1|
|            Seoul|from other city|Anyang Gunpo Past...|        1|
|            Seoul|     Gangnam-gu|Gangnam Dongin Ch...|        1|
|           Sejong|from other city|  Shincheonji Church|        1|
|     Jeollanam-do|from other city|  Shincheonji Church|        1|
|          Jeju-do|from other city|       Itaewon Clubs|        1|
|            Seoul|from other city|Daejeon door-to-d...|        1|
|            Seoul|              -|         Orange Life|      

In [9]:
import pyspark.sql.functions as F

cases.sort(F.desc('confirmed')).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|    infection_source|confirmed|
+-----------------+---------------+--------------------+---------+
|            Daegu|         Nam-gu|  Shincheonji Church|     4511|
|            Daegu|              -|contact with patient|      917|
|            Daegu|              -|                 etc|      747|
| Gyeongsangbuk-do|from other city|  Shincheonji Church|      566|
|      Gyeonggi-do|              -|     overseas inflow|      305|
|            Seoul|              -|     overseas inflow|      298|
|            Daegu|   Dalseong-gun|Second Mi-Ju Hosp...|      196|
| Gyeongsangbuk-do|              -|contact with patient|      190|
|            Seoul|              -|contact with patient|      162|
|            Seoul|     Yongsan-gu|       Itaewon Clubs|      139|
| Gyeongsangbuk-do|              -|                 etc|      133|
|            Daegu|         Seo-gu|Hansarang Convale...|      

In [10]:
import pyspark.sql.types as T


cases.printSchema

<bound method DataFrame.printSchema of DataFrame[province: string, city: string, infection_source: string, confirmed: int]>

In [11]:
cases.dtypes

[('province', 'string'),
 ('city', 'string'),
 ('infection_source', 'string'),
 ('confirmed', 'int')]

In [12]:
cases.withColumn('confirmed',F.col('confirmed').cast(T.DoubleType())).printSchema()

root
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_source: string (nullable = true)
 |-- confirmed: double (nullable = true)



In [13]:
cases.dtypes

[('province', 'string'),
 ('city', 'string'),
 ('infection_source', 'string'),
 ('confirmed', 'int')]

In [14]:
filt = ((cases.confirmed>10)& (cases.province=='Daegu'))

In [15]:
cases.filter(filt).show()

+--------+------------+--------------------+---------+
|province|        city|    infection_source|confirmed|
+--------+------------+--------------------+---------+
|   Daegu|      Nam-gu|  Shincheonji Church|     4511|
|   Daegu|Dalseong-gun|Second Mi-Ju Hosp...|      196|
|   Daegu|      Seo-gu|Hansarang Convale...|      124|
|   Daegu|Dalseong-gun|Daesil Convalesce...|      101|
|   Daegu|     Dong-gu|     Fatima Hospital|       39|
|   Daegu|           -|     overseas inflow|       41|
|   Daegu|           -|contact with patient|      917|
|   Daegu|           -|                 etc|      747|
+--------+------------+--------------------+---------+



In [16]:
cases.groupBy(['province','city']).agg(F.sum('confirmed'),F.max('confirmed'),F.min('confirmed')).show()

+----------------+---------------+--------------+--------------+--------------+
|        province|           city|sum(confirmed)|max(confirmed)|min(confirmed)|
+----------------+---------------+--------------+--------------+--------------+
|Gyeongsangnam-do|       Jinju-si|             9|             9|             9|
|           Seoul|        Guro-gu|           139|            95|             3|
|           Seoul|     Gangnam-gu|            18|             7|             1|
|         Daejeon|              -|           100|            55|            15|
|    Jeollabuk-do|from other city|             6|             3|             1|
|Gyeongsangnam-do|Changnyeong-gun|             7|             7|             7|
|           Seoul|              -|           561|           298|             1|
|         Jeju-do|from other city|             1|             1|             1|
|Gyeongsangbuk-do|              -|           345|           190|            22|
|Gyeongsangnam-do|   Geochang-gun|      

In [17]:
regions = spark.read.csv('C:\\Users\\admin\\Desktop\\spark-data\\Region.csv',inferSchema=True,header=True)

In [18]:
regions.show()

+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|         city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|        Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul|   Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul|  Gangdong-gu|37.530492|127.123837

In [19]:
cases1 = cases.join(regions,['province','city'],how='left')

In [20]:
cases1.limit(10).toPandas()

Unnamed: 0,province,city,infection_source,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,Itaewon Clubs,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,Richway,119,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,Guro-gu Call Center,95,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,Yangcheon Table Tennis Club,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,Day Care Center,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
5,Seoul,Guro-gu,Manmin Central Church,41,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
6,Seoul,from other city,SMR Newly Planted Churches Group,36,,,,,,,,,,
7,Seoul,Dongdaemun-gu,Dongan Church,17,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0
8,Seoul,from other city,Coupang Logistics Center,25,,,,,,,,,,
9,Seoul,Gwanak-gu,Wangsung Church,30,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0


In [21]:
cases.join(F.broadcast(regions),['province','city'],how='left').toPandas()

Unnamed: 0,province,city,infection_source,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,Itaewon Clubs,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,Richway,119,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,Guro-gu Call Center,95,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,Yangcheon Table Tennis Club,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,Day Care Center,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
169,Gyeongsangnam-do,-,etc,20,,,,,,,,,,
170,Jeju-do,-,overseas inflow,14,,,,,,,,,,
171,Jeju-do,-,contact with patient,0,,,,,,,,,,
172,Jeju-do,-,etc,4,,,,,,,,,,


In [22]:
cases

DataFrame[province: string, city: string, infection_source: string, confirmed: int]

In [23]:
regions

DataFrame[code: int, province: string, city: string, latitude: double, longitude: double, elementary_school_count: int, kindergarten_count: int, university_count: int, academy_ratio: double, elderly_population_ratio: double, elderly_alone_ratio: double, nursing_home_count: int]

In [24]:
from pyspark.sql import types as T

def caseshighlow(confirmed):
    if confirmed < 50:
        return 'low'
    else:
        return 'high'
    
    
    

In [25]:
caseshighlowUDF = F.udf(caseshighlow, T.StringType())

caseswithighlowUDF = cases.withColumn("HighLow",caseshighlowUDF('confirmed'))

caseswithighlowUDF.show()

+--------+---------------+--------------------+---------+-------+
|province|           city|    infection_source|confirmed|HighLow|
+--------+---------------+--------------------+---------+-------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|      139|   high|
|   Seoul|      Gwanak-gu|             Richway|      119|   high|
|   Seoul|        Guro-gu| Guro-gu Call Center|       95|   high|
|   Seoul|   Yangcheon-gu|Yangcheon Table T...|       43|    low|
|   Seoul|      Dobong-gu|     Day Care Center|       43|    low|
|   Seoul|        Guro-gu|Manmin Central Ch...|       41|    low|
|   Seoul|from other city|SMR Newly Planted...|       36|    low|
|   Seoul|  Dongdaemun-gu|       Dongan Church|       17|    low|
|   Seoul|from other city|Coupang Logistics...|       25|    low|
|   Seoul|      Gwanak-gu|     Wangsung Church|       30|    low|
|   Seoul|   Eunpyeong-gu|Eunpyeong St. Mar...|       14|    low|
|   Seoul|   Seongdong-gu|    Seongdong-gu APT|       13|    low|
|   Seoul|

In [26]:
df = spark.read.csv('C:\\Users\\admin\\Desktop\\spark-data\\syn_fraud_data.csv',inferSchema=True,header=True)

In [27]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [28]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [29]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [30]:
df = df.select('type','amount','oldbalanceOrg','newbalanceOrig','isFraud')

In [31]:
df.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



In [32]:
df.count()

6362620

In [33]:
train, test = df.randomSplit([0.7,0.3],seed=7)

In [34]:
train.count()

4453790

In [35]:
test.count()

1908830

In [36]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [37]:
catcols = [ x for (x,dataType) in train.dtypes if dataType == 'string']

numcols = [x for (x,dataType) in train.dtypes if ((dataType == 'double') & (x != 'isFraud'))]

In [38]:
print(catcols)
print(numcols)

['type']
['amount', 'oldbalanceOrg', 'newbalanceOrig']


In [39]:
train.select('type').distinct().show()

+--------+
|    type|
+--------+
|TRANSFER|
| CASH_IN|
|CASH_OUT|
| PAYMENT|
|   DEBIT|
+--------+



In [40]:
train.groupBy('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372981|
| CASH_IN| 980154|
|CASH_OUT|1565206|
| PAYMENT|1506515|
|   DEBIT|  28934|
+--------+-------+



In [41]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer


string_indexer = [StringIndexer(inputCol=x,outputCol=x + "_StringIndexer",handleInvalid="skip")
                              for x in catcols]

In [44]:
indexer = StringIndexer(inputCol='type',outputCol='type_indexed')
indexed = indexer.fit(train).transform(train)
indexed.show(5)

+-------+------+-------------+--------------+-------+------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_indexed|
+-------+------+-------------+--------------+-------+------------+
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|         2.0|
|CASH_IN|  4.58|      94241.0|      94245.58|      0|         2.0|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|         2.0|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|         2.0|
|CASH_IN|  5.44|          0.0|          5.44|      0|         2.0|
+-------+------+-------------+--------------+-------+------------+
only showing top 5 rows



In [46]:
indexed.count()
print(train.count())

4453790


In [49]:
encoder = OneHotEncoderEstimator(inputCols=['type_indexed'],outputCols=['typeVec'],dropLast=True)

In [50]:
model = encoder.fit(indexed)
encoded = model.transform(indexed)
encoded.show(5)

+-------+------+-------------+--------------+-------+------------+-------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_indexed|      typeVec|
+-------+------+-------------+--------------+-------+------------+-------------+
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|         2.0|(4,[2],[1.0])|
|CASH_IN|  4.58|      94241.0|      94245.58|      0|         2.0|(4,[2],[1.0])|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|         2.0|(4,[2],[1.0])|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|         2.0|(4,[2],[1.0])|
|CASH_IN|  5.44|          0.0|          5.44|      0|         2.0|(4,[2],[1.0])|
+-------+------+-------------+--------------+-------+------------+-------------+
only showing top 5 rows



In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler



In [None]:
string_indexer

In [None]:
one_hot_encoder = [
    OneHotEncoderEstimator(
        inputCols=[f"{x}_StringIndexer" for x in catcols],
        outputCols=[f"{x}_OneHotEncoder" for x in catcols],
)]

In [None]:
from pyspark.ml.feature import VectorAssembler 

In [52]:
assemblerinput = [ x for x in catcols]
assemblerinput += [f"{x}_OneHotEncoder" for x in catcols]

In [55]:
assemblerinput

['type', 'type_OneHotEncoder']

AttributeError: 'str' object has no attribute 'items'

In [None]:
vector_assembler = VectorAssembler(inputCols=assemblerinput,outputCol="VectorAssembler_features")

In [None]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages +=[vector_assembler]

In [None]:
stages

In [None]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)