In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

In [2]:
spark=SparkSession.builder.appName('user_Rollup1').getOrCreate()

In [3]:
df_users=spark.read.csv('users.csv',header=True,inferSchema=True)
df_cards=spark.read.csv('cards.csv',header=True,inferSchema=True)
df_groups=spark.read.csv('groups.csv',header=True,inferSchema=True)
df_channels=spark.read.csv('channels.csv',header=True,inferSchema=True)

In [4]:
df_users.columns

['name',
 'time',
 'event_time',
 'user_id',
 'event',
 'attributes',
 'org_id',
 'analytics_version',
 'platform',
 'user_agent',
 'edited_user_id',
 'changed_column',
 'old_val',
 'new_val',
 'added_role',
 'removed_role',
 'topic_id',
 'topic_label',
 'topic_name',
 'user_role',
 'created_user_id',
 'follower_id',
 'followed_user_id',
 'member_user_id',
 'skill_id',
 'full_name',
 'gender',
 'org_hostname']

In [5]:
df_cards.columns

['name',
 'time',
 'event_time',
 'user_id',
 'card_id',
 'event',
 'attributes',
 'org_id',
 'analytics_version',
 'platform',
 'changed_column',
 'old_val',
 'new_val',
 'org_hostname',
 'channel_id',
 'journey_id',
 'pathway_id',
 'comment_id',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27']

In [6]:
df_groups.columns

['name',
 'time',
 'event_time',
 'event',
 'user_id',
 'group_id',
 'analytics_version',
 'card_id',
 'changed_column',
 'new_val',
 'old_val',
 'group_name',
 'group_user_id',
 'group_user_role',
 'user_full_name',
 'user_handle',
 'channel_id',
 'channel_name',
 'org_hostname',
 'org_id',
 '_c20']

In [7]:
df_channels.columns

['name',
 'time',
 'event_time',
 'event',
 'user_id',
 'channel_id',
 'analytics_version',
 'org_id',
 'changed_column',
 'old_val',
 'new_val',
 'curator_id',
 'user_full_name',
 'request_id',
 'channel_name',
 '_c15']

In [8]:
for column in [column for column in df_cards.columns if column not in df_users.columns]:
    df_users = df_users.withColumn(column, lit(None))

for column in [column for column in df_users.columns if column not in df_cards.columns]:
    df_cards = df_cards.withColumn(column, lit(None))

In [9]:
base_report=df_users.unionByName(df_cards)

In [10]:
for column in [column for column in df_groups.columns if column not in base_report.columns]:
    base_report = base_report.withColumn(column, lit(None))

for column in [column for column in base_report.columns if column not in df_groups.columns]:
    df_groups = df_groups.withColumn(column, lit(None))

In [11]:
base_report=base_report.unionByName(df_groups)

In [12]:
for column in [column for column in df_channels.columns if column not in base_report.columns]:
    base_report = base_report.withColumn(column, lit(None))

for column in [column for column in base_report.columns if column not in df_channels.columns]:
    df_channels = df_channels.withColumn(column, lit(None))

In [13]:
base_report=base_report.unionByName(df_channels)

In [14]:
base_report = base_report.withColumn('utc_time',from_unixtime("event_time"))

In [15]:
base_report.columns

['name',
 'time',
 'event_time',
 'user_id',
 'event',
 'attributes',
 'org_id',
 'analytics_version',
 'platform',
 'user_agent',
 'edited_user_id',
 'changed_column',
 'old_val',
 'new_val',
 'added_role',
 'removed_role',
 'topic_id',
 'topic_label',
 'topic_name',
 'user_role',
 'created_user_id',
 'follower_id',
 'followed_user_id',
 'member_user_id',
 'skill_id',
 'full_name',
 'gender',
 'org_hostname',
 'card_id',
 'channel_id',
 'journey_id',
 'pathway_id',
 'comment_id',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 'group_id',
 'group_name',
 'group_user_id',
 'group_user_role',
 'user_full_name',
 'user_handle',
 'channel_name',
 'curator_id',
 'request_id',
 '_c15',
 'utc_time']

In [16]:
base_report = base_report.withColumn('year',year("utc_time"))
base_report = base_report.withColumn('month',month("utc_time"))
base_report = base_report.withColumn('day',to_date("utc_time"))
base_report = base_report.withColumn('hour',hour("utc_time"))

In [17]:
base_report = base_report.drop('utc_time')

In [18]:
user_rollup1 = base_report.groupBy("user_id","year","month","day","hour").pivot("event").agg(count("event"))
user_rollup1.toPandas()

Unnamed: 0,user_id,year,month,day,hour,null,card_added_to_channel,card_added_to_journey,card_added_to_pathway,card_bookmarked,...,user_expertise_topic_removed,user_followed,user_interest_topic_added,user_interest_topic_removed,user_logged_in,user_role_added,user_role_removed,user_skill_created,user_skill_edited,user_unfollowed
0,827336,2021.0,9.0,2021-09-30,14.0,,,,,,...,,,,,4.0,,,,,
1,321199,2021.0,9.0,2021-09-30,6.0,,,,,,...,,,,,1.0,,,,,
2,823421,2021.0,9.0,2021-09-24,19.0,,,,,,...,,,,,1.0,,,,,
3,695585,2021.0,9.0,2021-09-28,16.0,,,6.0,6.0,,...,,,,,2.0,,,,,
4,925297,2021.0,9.0,2021-09-27,18.0,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9339,694017,2021.0,9.0,2021-09-20,18.0,,,,,,...,,,,,,,,,,
9340,981377,2021.0,9.0,2021-09-30,16.0,,,,,,...,,,,,,,,,,
9341,115888,2021.0,10.0,2021-10-01,16.0,,,,,,...,,,,,,,,,,
9342,825135,2021.0,9.0,2021-09-29,2.0,,,,,,...,,,,,,,,,,


In [19]:
user_rollup1 = user_rollup1.na.fill(value=0)
user_rollup1 = user_rollup1.drop('null')

In [20]:
user_rollup1.columns

['user_id',
 'year',
 'month',
 'day',
 'hour',
 'card_added_to_channel',
 'card_added_to_journey',
 'card_added_to_pathway',
 'card_bookmarked',
 'card_comment_created',
 'card_comment_deleted',
 'card_created',
 'card_created_virtual',
 'card_dismissed',
 'card_edited',
 'card_liked',
 'card_marked_as_complete',
 'card_marked_as_uncomplete',
 'card_published',
 'card_viewed',
 'channel_created',
 'channel_curator_added',
 'channel_visited',
 'group_channel_followed',
 'group_created',
 'group_deleted',
 'group_edited',
 'group_user_added',
 'group_user_edited',
 'group_user_removed',
 'group_visited',
 'user_activity_window_started',
 'user_created',
 'user_edited',
 'user_expertise_topic_added',
 'user_expertise_topic_removed',
 'user_followed',
 'user_interest_topic_added',
 'user_interest_topic_removed',
 'user_logged_in',
 'user_role_added',
 'user_role_removed',
 'user_skill_created',
 'user_skill_edited',
 'user_unfollowed']

In [21]:
card_rollup1 = base_report.filter(base_report.event.contains("card"))
card_rollup1 = card_rollup1.groupBy("card_id","year","month","day","hour").pivot("event").agg(count("event"))
card_rollup1.toPandas()

Unnamed: 0,card_id,year,month,day,hour,card_added_to_channel,card_added_to_journey,card_added_to_pathway,card_bookmarked,card_comment_created,card_comment_deleted,card_created,card_created_virtual,card_dismissed,card_edited,card_liked,card_marked_as_complete,card_marked_as_uncomplete,card_published,card_viewed
0,8352280,2021,9,2021-09-22,19,,,,,,,1.0,,,3.0,,,,,
1,8356278,2021,9,2021-09-24,11,,,,,,,,,,,,1.0,,,
2,8355324,2021,9,2021-09-23,15,,,,1.0,,,1.0,,,3.0,,,,,
3,8360448,2021,9,2021-09-29,0,1.0,,,,,,,,,,,,,,
4,8361242,2021,9,2021-09-29,14,,,,1.0,,,1.0,,,3.0,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
18052,513589,2021,10,2021-10-01,15,,,,,,,,,,2.0,,,,,
18053,410252,2021,10,2021-10-01,15,,,,,,,,,,2.0,,,,,
18054,410895,2021,10,2021-10-01,15,,,,,,,,,,2.0,,,,,
18055,8359859,2021,9,2021-09-28,14,,,,,,,1.0,,,3.0,,,,,


In [22]:
card_rollup1 = card_rollup1.na.fill(value=0)
card_rollup1 = card_rollup1.drop('null')

In [23]:
card_rollup1.columns

['card_id',
 'year',
 'month',
 'day',
 'hour',
 'card_added_to_channel',
 'card_added_to_journey',
 'card_added_to_pathway',
 'card_bookmarked',
 'card_comment_created',
 'card_comment_deleted',
 'card_created',
 'card_created_virtual',
 'card_dismissed',
 'card_edited',
 'card_liked',
 'card_marked_as_complete',
 'card_marked_as_uncomplete',
 'card_published',
 'card_viewed']

In [24]:
user_rollup1.toPandas().to_csv('user_rollup1.csv', index=False)
card_rollup1.toPandas().to_csv('card_rollup1.csv', index=False)
base_report.toPandas().to_csv('base_Report1.csv', index=False) 