# Advanced Azure Data Analytics Day : Azure Synapse Analytics Training For Game Industry


- ## Churn Prediction Using Azure Synapse Analytics
- ## Step 1. Data Preprocessing



# 1. Import Spark Lib


In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 2. Load Raw Data Set


In [2]:
# abfss://{filesystem}@{storage acount}.{domain}/{path}/{filename}.{fileformat}
# Character_Log_Data(사용자 캐릭터 로그 데이터)
wow_log_df = spark.read.load('abfss://aaafilesystem@aaatraing.dfs.core.windows.net/sparkpooldata/WoW_Logs.csv', format='csv', header=True)

# InGame_Log_Data(인게임 캐릭터 위치 로그 데이터)
zones_df = spark.read.load('abfss://aaafilesystem@aaatraing.dfs.core.windows.net/sparkpooldata/zones.csv', format='csv', header=True)

# Churn_Label_Log_Data(이탈 여부 태그 로그 데이터)
churners_df = spark.read.load('abfss://aaafilesystem@aaatraing.dfs.core.windows.net/sparkpooldata/churners.csv', format='csv', header=True)

# 3. Preprocess Data
## 3.1 Rename Column name


In [3]:
# Character_Log_Data(사용자 캐릭터 로그 데이터)
wow_log_df = wow_log_df.withColumnRenamed('char', 'IdentifierId').withColumnRenamed('zone', 'zoneId')\
            .withColumnRenamed('timestamp', 'log_timestamp')

# InGame_Log_Data(인게임 캐릭터 위치 로그 데이터)
zones_df = zones_df.withColumnRenamed('Zone_Name', 'zoneId')

# Churn_Label_Log_Data(이탈 여부 태그 로그 데이터)
churners_df = churners_df.withColumnRenamed('char', 'IdentifierId')\
                         .withColumnRenamed('timestamp', 'churn_timestamp')

## 3.2 Convert DataType ( log_timestamp , churn_timestamp )
- #### String to timestamp( unix_timestamp → to_timestamp )

In [4]:
wow_log_df = wow_log_df.withColumn('log_timestamp',unix_timestamp(col('log_timestamp'),'E MMM dd HH:mm:ss z yyyy'))
wow_log_df = wow_log_df.withColumn("log_timestamp",to_timestamp(col('log_timestamp')))
display(wow_log_df)

churners_df = churners_df.withColumn('churn_timestamp',unix_timestamp(col('churn_timestamp'),'E MMM dd HH:mm:ss z yyyy'))
churners_df = churners_df.withColumn('churn_timestamp',to_timestamp(col('churn_timestamp')))

In [6]:
display(churners_df)

## 3.3 Extract Values From brackets
- ### https://regex101.com/r/Ra7VjU/1


In [5]:
# Character_Log_Data(사용자 캐릭터 로그 데이터)
wow_log_df = wow_log_df.withColumn('IdentifierId', regexp_extract(col('IdentifierId'), r"\(([^()]+)\)", 1))\
.withColumn('zoneId', regexp_extract(col('zoneId'), r"\(([^()]+)\)", 1)) #8,654,936
# InGame_Log_Data(인게임 캐릭터 위치 로그 데이터)
zones_df = zones_df.withColumn('zoneId', regexp_extract(col('zoneId'), r"\(([^()]+)\)", 1)) #160

# Churn_Label_Log_Data(이탈 여부 태그 로그 데이터)
churners_df = churners_df.withColumn('IdentifierId', regexp_extract(col('IdentifierId'), r"\(([^()]+)\)", 1)) #14,579

## 3.4 Join DataFrme by Column( IdentifierId , zoneId )


In [6]:
# Character_Log_Data(사용자 캐릭터 로그 데이터) ∩ InGame_Log_Data(인게임 캐릭터 위치 로그 데이터)
wow_log_join = wow_log_df.join(zones_df, wow_log_df.zoneId == zones_df.zoneId).drop(zones_df.zoneId)
print('Character ∩ InGame =', wow_log_join.count(), len(wow_log_join.columns))

# Character_Log_Data(사용자 캐릭터 로그 데이터) ∩ InGame_Log_Data(인게임 캐릭터 위치 로그 데이터) ∩ Churn_Label_Log_Data(이탈 여부 태그 로그 데이터)
wow_log_result_join = wow_log_join.join(churners_df, wow_log_join.IdentifierId == churners_df.IdentifierId).drop(churners_df.IdentifierId)
print('Character ∩ InGame ∩ Churn_Label =', wow_log_result_join.count(), len(wow_log_result_join.columns))
display(wow_log_result_join)

## 3.5 Save File & Table



In [17]:
wow_log_result_join.createOrReplaceTempView("pysparkdftemptable") #temptable에 저장
wow_log_result_join.write.mode("overwrite").saveAsTable('sparkwowdataset') # sparkpool에 저장
wow_log_result_join.write.mode("overwrite").save('abfss://aaafilesystem@aaatraing.dfs.core.windows.net/sparkpooldata/churnersdf_yj3.csv', format='csv', header=True) #ADLS에 저장

In [22]:
%%spark
val scala_df = spark.sqlContext.sql ("select * from pysparkdftemptable")
scala_df.write.mode("overwrite").synapsesql("aaasqlpool.dbo.notebooksqlwowdata", Constants.INTERNAL) //Sql pool에 저장

scala_df: org.apache.spark.sql.DataFrame = [IdentifierId: string, level: string ... 19 more fields]