In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)
sqlContext = SQLContext(sc)

In [2]:
wow_data = sqlContext.read.load('file:////home/deepannrr3037/newlogs.csv/', 
                          delimiter=',',
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

In [3]:
wow_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- QueryTime: string (nullable = true)
 |-- QuerySeq: integer (nullable = true)
 |-- AvatarID: integer (nullable = true)
 |-- Guild: string (nullable = true)
 |-- Level: integer (nullable = true)
 |-- Race: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Zone: string (nullable = true)



In [4]:
from pyspark.sql.functions import unix_timestamp
wow_data = wow_data.withColumn('QuertTime_TS',unix_timestamp(wow_data['QueryTime'], "yy/dd/MM hh:mm:ss").cast("timestamp"))
wow_data = wow_data.withColumn('QuertTime_D',wow_data['QuertTime_TS'].cast("date"))

In [5]:
wow_data.registerTempTable( "wow_data" )

In [6]:
wow_data = sqlContext.sql( """select AvatarID,case when Guild = ' ' then 0 else 1 end as Guild, 
                          case when Level between 1 and 23 then 2 
                               when Level between 24 and 47 then 3 else 4 end as Level,Race,
                          case when Race = ' Orc' then 1 else 0 end as Race_Orc,
                          case when Race = ' Tauren' then 1 else 0 end as Race_Tauren,
                          case when Race = ' Troll' then 1 else 0 end as Race_Troll,
                          case when Race = ' Undead' then 1 else 0 end as Race_Undead,
                          case when Class = ' Warrior' then 1 else 0 end as Class_Warrior,
                          case when Class = ' Hunter' then 1 else 0 end as Class_Hunter,
                          case when Class = ' Rogue' then 1 else 0 end as Class_Rogue,
                          case when Class = ' Shaman' then 1 else 0 end as Class_Shaman,
                          case when Class = ' Warlock' then 1 else 0 end as Class_Warlock,
                          case when Class = ' Druid' then 1 else 0 end as Class_Druid,
                          case when Class = ' Mage' then 1 else 0 end as Class_Mage,
                          case when Class = ' Priest' then 1 else 0 end as Class_Priest,
                          QuertTime_D as LoginTime                         
from wow_data""" )

In [7]:
from pyspark.sql.functions import month, year, dayofmonth,concat

In [8]:
wow_data = wow_data.withColumn( 'LoginTime_i', (concat(dayofmonth(wow_data.LoginTime),
                                                   month( wow_data.LoginTime ),year( wow_data.LoginTime ))).cast("integer"))

In [9]:
wow_data_Login = wow_data.select('AvatarID','LoginTime')

In [10]:
wow_data_Login.registerTempTable( "wow_data_Login" )

In [11]:
wow_data_Login =  sqlContext.sql( """select AvatarID,LoginTime,count(*) as Loginperday from wow_data_Login group by AvatarID,LoginTime""")

In [12]:
wow_data_Login.count()

447127

In [13]:
wow_data.registerTempTable( "wow_data" )

In [14]:
wow_derived = sqlContext.sql( """select AvatarID,max(LoginTime) as Last_login_dt,min(LoginTime) as first_login_dt,datediff(max(LoginTime),min(LoginTime)) as total_days_actv,count(distinct(LoginTime)) as actual_days_actv,
count(LoginTime)as Login_count from wow_data group by AvatarID""")

In [15]:
wow_derived.count()

26165

In [16]:
wow_derived.registerTempTable( "wow_derived" )

In [17]:
wow_data_Login.registerTempTable( "wow_data_Login" )

Filtering the data, so as to give labels of Churn or not  churn

In [18]:
wow_derived = wow_derived.where((wow_derived.total_days_actv > 0) | (wow_derived.Last_login_dt > '2011-11-30' ))

In [19]:
wow_derived.registerTempTable( "wow_derived_1" )

In [20]:
df = sqlContext.sql("""select d.AvatarID,LoginTime,Last_login_dt,datediff(Last_login_dt,LoginTime) as Login_rt_ld,Loginperday,total_days_actv,actual_days_actv,Login_count 
as Total_login from wow_data_Login l, wow_derived_1 d where l.AvatarID = d.AvatarID 
 order by d.AvatarID,LoginTime desc""")

In [21]:
df.count()

436896

In [22]:
df.show()

+--------+----------+-------------+-----------+-----------+---------------+----------------+-----------+
|AvatarID| LoginTime|Last_login_dt|Login_rt_ld|Loginperday|total_days_actv|actual_days_actv|Total_login|
+--------+----------+-------------+-----------+-----------+---------------+----------------+-----------+
|       0|2012-05-31|   2012-05-31|          0|          1|           4017|               4|         76|
|       0|2001-06-20|   2012-05-31|       3998|          2|           4017|               4|         76|
|       0|2001-06-07|   2012-05-31|       4011|          4|           4017|               4|         76|
|       0|2001-06-01|   2012-05-31|       4017|         69|           4017|               4|         76|
|       1|2012-05-31|   2012-05-31|          0|          1|           4017|               3|         19|
|       1|2001-06-07|   2012-05-31|       4011|          1|           4017|               3|         19|
|       1|2001-06-01|   2012-05-31|       4017|        

In [23]:
from pyspark.sql.functions import datediff,to_date,lit
df = df.where(datediff(to_date(lit('2012-05-31')),df.Last_login_dt) <= 365)

In [24]:
df.count()

267185

In [25]:
from pyspark.sql.functions import col

df = df.withColumn("Active_Ratio_1",(df.actual_days_actv/df.total_days_actv)*100)

In [26]:
df.show()

+--------+----------+-------------+-----------+-----------+---------------+----------------+-----------+-------------------+
|AvatarID| LoginTime|Last_login_dt|Login_rt_ld|Loginperday|total_days_actv|actual_days_actv|Total_login|     Active_Ratio_1|
+--------+----------+-------------+-----------+-----------+---------------+----------------+-----------+-------------------+
|       0|2012-05-31|   2012-05-31|          0|          1|           4017|               4|         76|0.09957679860592482|
|       0|2001-06-20|   2012-05-31|       3998|          2|           4017|               4|         76|0.09957679860592482|
|       0|2001-06-07|   2012-05-31|       4011|          4|           4017|               4|         76|0.09957679860592482|
|       0|2001-06-01|   2012-05-31|       4017|         69|           4017|               4|         76|0.09957679860592482|
|       1|2012-05-31|   2012-05-31|          0|          1|           4017|               3|         19|0.07468259895444362|


In [27]:
df = df.where((df.Active_Ratio_1) > 40.0).select("AvatarID").distinct()

In [28]:
df = df.withColumn('Churn', lit(0))

In [29]:
df.show()

+--------+-----+
|AvatarID|Churn|
+--------+-----+
|   22097|    0|
|   22346|    0|
|   22373|    0|
|   22521|    0|
|   23015|    0|
|   23571|    0|
|   24347|    0|
|   25462|    0|
|   25591|    0|
|   26087|    0|
|   22004|    0|
|   22021|    0|
|   22188|    0|
|   22412|    0|
|   22555|    0|
|   22970|    0|
|   23136|    0|
|   23455|    0|
|   23523|    0|
|   23607|    0|
+--------+-----+
only showing top 20 rows



In [30]:
df.registerTempTable( "df" )

In [31]:
df_2 =sqlContext.sql("""select l.AvatarID,max(Total_days_actv) as Total_days_actv,max(Actual_days_actv) as Actual_days_actv,max(Login_count) as Total_login,
                                       case when max(Level) = 2 then 1 else 0 end as Level_l ,
                                       case when max(Level) = 3 then 1 else 0 end as Level_m ,
                                       case when max(Level) = 4 then 1 else 0 end as Level_h ,
                                       max(Race_Orc) as Race_Orc,
                                       max(Race_Tauren) as Race_Tauren,
                                       max(Race_Troll) as Race_Troll,
                                       max(Race_Undead) as Race_Undead,
                                       max(Class_Warrior) as Class_Warrior,
                                       max(Class_Hunter) as Class_Hunter,
                                       max(Class_Rogue) as Class_Rogue,
                                       max(Class_Shaman) as Class_Shaman,
                                       max(Class_Warlock) as Class_Warlock,
                                       max(Class_Druid) as Class_Druid,
                                       max(Class_Mage) as Class_Mage,
                                       max(Class_Priest) as Class_Priest                                       
                                       from wow_data l, wow_derived d where
                                       l.AvatarID = d.AvatarID
                                       group by l.AvatarID""")

In [32]:
df_2.registerTempTable( "df_2" )

In [33]:
wow_data.count()

9701530

In [34]:
wow_derived.count()

15934

In [35]:
df_2.select("AvatarID").distinct().count()

26165

In [36]:
df_2.show()

+--------+---------------+----------------+-----------+-------+-------+-------+--------+-----------+----------+-----------+-------------+------------+-----------+------------+-------------+-----------+----------+------------+
|AvatarID|Total_days_actv|Actual_days_actv|Total_login|Level_l|Level_m|Level_h|Race_Orc|Race_Tauren|Race_Troll|Race_Undead|Class_Warrior|Class_Hunter|Class_Rogue|Class_Shaman|Class_Warlock|Class_Druid|Class_Mage|Class_Priest|
+--------+---------------+----------------+-----------+-------+-------+-------+--------+-----------+----------+-----------+-------------+------------+-----------+------------+-------------+-----------+----------+------------+
|     148|            748|              66|       1202|      0|      0|      1|       0|          1|         0|          0|            0|           0|          0|           1|            0|          0|         0|           0|
|     463|           3307|             158|       2412|      0|      0|      1|       0|        

In [37]:
df_7 =sqlContext.sql("""select a.AvatarID,Total_days_actv,Actual_days_actv,(Actual_days_actv/Total_days_actv)*100 as Active_Ratio,Total_login,Level_l,Level_m,Level_h,Race_Orc,Race_Tauren,Race_Troll,Race_Undead,
Class_Warrior,Class_Hunter,Class_Rogue,Class_Shaman,Class_Warlock,Class_Druid,Class_Mage,Class_Priest,
case when b.Churn = 0 then 0 else 1 end as Churn
from df_2 a LEFT OUTER JOIN df b on a.AvatarID = b.AvatarID""") 

In [38]:
df_7.where(df_7.Churn != 0 ).take(5)

[Row(AvatarID=148, Total_days_actv=748, Actual_days_actv=66, Active_Ratio=8.823529411764707, Total_login=1202, Level_l=0, Level_m=0, Level_h=1, Race_Orc=0, Race_Tauren=1, Race_Troll=0, Race_Undead=0, Class_Warrior=0, Class_Hunter=0, Class_Rogue=0, Class_Shaman=1, Class_Warlock=0, Class_Druid=0, Class_Mage=0, Class_Priest=0, Churn=1),
 Row(AvatarID=463, Total_days_actv=3307, Actual_days_actv=158, Active_Ratio=4.7777441790142126, Total_login=2412, Level_l=0, Level_m=0, Level_h=1, Race_Orc=0, Race_Tauren=0, Race_Troll=0, Race_Undead=1, Class_Warrior=0, Class_Hunter=0, Class_Rogue=0, Class_Shaman=0, Class_Warlock=0, Class_Druid=0, Class_Mage=0, Class_Priest=1, Churn=1),
 Row(AvatarID=471, Total_days_actv=12, Actual_days_actv=11, Active_Ratio=91.66666666666666, Total_login=253, Level_l=0, Level_m=0, Level_h=1, Race_Orc=1, Race_Tauren=0, Race_Troll=0, Race_Undead=0, Class_Warrior=1, Class_Hunter=0, Class_Rogue=0, Class_Shaman=0, Class_Warlock=0, Class_Druid=0, Class_Mage=0, Class_Priest=0, C

In [39]:
df_7.count()

26165

In [40]:
import pandas as pd
df_8 = pd.DataFrame(df_7.take(26165), columns=df_7.columns)

In [41]:
df_8.to_csv("wow_data_wtLabel_C.csv", sep=',', encoding='utf-8')