In [1]:
#relative paths
import os, sys, inspect

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir  = os.path.dirname(currentdir)

rootdir = parentdir # is it?
dwhdir = os.path.join(rootdir, 'dwh')

channelChangesFilename      = os.path.join(dwhdir, 'channel_changes.csv')
eventsRefDataFilename       = os.path.join(dwhdir, 'events.csv')
programsRefDataFilename     = os.path.join(dwhdir, 'programs.csv')


print('In :', channelChangesFilename)
print('In :', eventsRefDataFilename)
print('In :', programsRefDataFilename)


In : D:\python_virtualenvs\src\pandas_workshop\dwh\channel_changes.csv
In : D:\python_virtualenvs\src\pandas_workshop\dwh\events.csv
In : D:\python_virtualenvs\src\pandas_workshop\dwh\programs.csv


In [2]:
#other imports
from datetime import datetime
import pandas as pd

pd.options.display.max_rows = 10
pd.options.display.max_colwidth = 180
pd.options.display.width = 180

In [3]:
#useful functions
class Timer(object):

    def __init__(self):
        self.lastTime = None

    def time(self, msg="Time passed", restart=False):
        if restart: self.lastTime = None

        now = datetime.now()
        if self.lastTime:
            print ("{:40s}: {:7.3f}s".format(msg, (now - self.lastTime).total_seconds()))
        
        self.lastTime = now

perfTimer = Timer()


In [4]:


def read_channel_changes_csv(filename):
    df = pd.read_csv(filename,
        names=['Timestamp', 'UserId', 'Channel', 'EventId'],
        header=0 # when passing names, it's assumed the start of the file as the start of data... 
                 # file already has headers in row 0 
    )
    #cut information that we will not need
    df = df[['UserId', 'EventId']].copy()
    df.drop_duplicates(inplace=True)
    return df


def read_events_ref_data_csv(filename):
    df = pd.read_csv(filename,
        names=['EventId', 'ChannelId', 'ProgramId', 'Duration', 'StartDate', 'EndDate'], 
        dtype={'Position': int} 
    )
    return df


def read_programs_ref_data_csv(filename):
    df = pd.read_csv(filename, dtype={'SeriesId': str, 'SeriesSeason': str, 'SeriesEpisode': str})
    # after looking and checking there are NaN's:
    df = df.fillna('')
    return df


In [5]:

perfTimer.time(restart=True)

channelChanges  = read_channel_changes_csv  ( channelChangesFilename)

perfTimer.time("Reading Channel Changes")

eventsRefData   = read_events_ref_data_csv  (  eventsRefDataFilename)

perfTimer.time("Reading Events Ref Data")

programsRefData = read_programs_ref_data_csv(programsRefDataFilename)

perfTimer.time("Reading Programs Ref Data")

print ('\n\nCHANNEL CHANGES:',  channelChanges.shape[0], "\n",  channelChanges.head(1))
print ('\n\nEVENTS         :',   eventsRefData.shape[0], "\n",   eventsRefData.head(1))
print ('\n\nPROGRAMS       :', programsRefData.shape[0], "\n", programsRefData.head(1))


Reading Channel Changes                 :   2.571s
Reading Events Ref Data                 :   0.078s
Reading Programs Ref Data               :   0.109s


CHANNEL CHANGES: 834917 
                              UserId     EventId
0  7f98c243497046a086b09a84d68aa0f1  E/00904866


EVENTS         : 19434 
       EventId ChannelId  ProgramId  Duration            StartDate              EndDate
0  E/00904858    C-0008  P_0002384         5  2017-01-01 23:50:00  2017-01-01 23:55:00


PROGRAMS       : 10000 
    ProgramId ProgramType IsLive                                 ProgramTitle                                     Genres  \
0  P_0000002   RECURRENT         Peta Buccaneer Substantial Tanzania Outlets  health*talkshow*didactic*culturalmagazine   

                                                                     Actors                     Directors  Rating  Year  \
0  Guy Fieri*Sally Martin*Annouck Hautbois*Jesse Tyler Ferguson*George Eads  Niamh Sharkey*Junichi Masuda    3.31  2017   

 

In [6]:
def join_on_col(dfA, dfB, col):
    dfC = dfA.merge(dfB, on=col, how='inner')
    return dfC



In [7]:

def get_recommendations(userId, programId):

    eventsForProgram         = get_events_for_program(programId)
    usersForProgram          = get_users_for_events(eventsForProgram)
    eventIdsSeenByUsers      = get_event_ids_for_users(usersForProgram)
    otherEventIdsSeenByUsers = remove_event_ids_of_program(eventIdsSeenByUsers, eventsForProgram)
    otherEventsSeenByUsers   = join_with_events(otherEventIdsSeenByUsers)
    programsSeenByUsers      = get_programs_for_users(otherEventsSeenByUsers, limit=40)
    
    return programsSeenByUsers


In [39]:
#get a randomUser()

def get_random_user():
    return channelChanges.UserId.sample().iloc[0]

def get_random_program(): # to guarantee that the event actually exists
    change      = channelChanges.sample().iloc[0].to_dict()
    eventId     = change.get('EventId')
    programId   = eventsRefData[eventsRefData.EventId == eventId].iloc[0].ProgramId
    program     = programsRefData[programsRefData.ProgramId == programId] 
    return program.iloc[0].to_dict()



In [40]:
userId = get_random_user()
program = get_random_program()

print("User", userId)
print("Program", program)

programId = program.get('ProgramId')

User 31b0072419f44440b7c33c4a706d2bfa
Program {'ProgramId': 'P_0005876', 'ProgramType': 'RECURRENT', 'IsLive': '', 'ProgramTitle': 'Boils Glancing Oa Rash', 'Genres': 'history*politics*romance*gymnastic', 'Actors': 'Chris Broyles*Richard Hammond*Tom Kenny*Sophie Morton*Sally Martin', 'Directors': 'Emmanuelle Fleury', 'Rating': 4.54, 'Year': 2000, 'Description': 'advocacy clashing angrily defile oncology pinche duckula agrees oleg santosh runoff alexei presumed poisoner orangutan.', 'SeriesId': '', 'SeriesSeason': '', 'SeriesEpisode': ''}


In [41]:

def get_events_for_program(programId):
    events = eventsRefData[eventsRefData.ProgramId == programId]
    return list(events.to_dict()['EventId'].values())

print ("Program", programId, type(programId))

perfTimer.time(restart=True)
eventsForProgram = get_events_for_program(programId)
perfTimer.time("TIMING: events for program")

print ("Found {} events for that program {}".format(len(eventsForProgram), programId))


Program P_0005876 <class 'str'>
timing events for program               :   0.004s
Found 2 events for that program P_0005876


In [42]:

def get_users_for_events(eventList):
    print('eventList', type(eventList), eventList)
    changes = channelChanges[channelChanges.EventId.isin(eventList)].copy()
    print ('get_users_for_events:', changes.shape[0], 'changes')
    users = changes.UserId.unique()
    return users

eventsForProgram = get_events_for_program(programId)

perfTimer.time(restart=True)
usersForProgram = get_users_for_events(eventsForProgram)
perfTimer.time("TIMING: get users for program")

print(len(usersForProgram), 'users found: e.g. ', usersForProgram[0:5])

eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
timing get users for program            :   0.044s
1164 users found: e.g.  ['61a8a48035524095b47c4743d99054aa' '372052c289f7451488d05b60e59f7ec4'
 '290959a61fe74c0a979105e6e8c0ce6f' '46414384e07a496c965e5a5c49135d20'
 '83378d11fd2349b2be09c8ba6b99d5ae']


In [53]:
def get_event_ids_for_users(userList):
    
    userActivity = channelChanges[channelChanges.UserId.isin(userList)].copy()
    print("Total Activity of Users {:7d}".format(userActivity.shape[0]) )
    #unqUserActivity = unqUserActivity.drop_duplicates(['UserId', 'EventId']) # already did it at the source
    activityByEvent = userActivity.groupby(by='EventId', sort=False).count()
    activityByEvent.columns = ['Count'] # rename remaining column (UserId) that will be used to store count to Count
    print('Total Distinct Events   {:7d}'.format(activityByEvent.shape[0]), '\n', activityByEvent.tail(1))
    return activityByEvent

programEvents     = get_events_for_program(programId)
usersForProgram   = get_users_for_events(programEvents)

perfTimer.time(restart=True)
eventsSeenByUsers = get_event_ids_for_users(usersForProgram)
perfTimer.time("TIMING: get events for users")

print ("Completelly unnecessary: Top Events: ")
eventsSortedByTopAudience = eventsSeenByUsers.sort_values(by='Count', ascending=False)
print(eventsSortedByTopAudience.head(2))

eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
Total Activity of Users  417102
Total Distinct Events      5890 
             Count
EventId          
E/00922817      1
TIMING: get events for users            :   0.123s
Completelly unnecessary: Top Events: 
            Count
EventId          
E/00905067   1164
E/00909334    729


In [54]:

def remove_event_ids_of_program(eventDf, eventsToRemove):
    print (eventDf.loc['E/00905067'])
    
    # try to filter on index with not in - time it
    #events = eventDf[ ~eventDf.EventId.isin(eventsToRemove) ] # must filter on index

    #alternative - use column instead of index
    eventsNoIndex = eventDf.reset_index()
    events = eventsNoIndex[ ~eventsNoIndex.EventId.isin(eventsToRemove) ] # must filter on index
    
    return events

eventsForProgram  = get_events_for_program(programId)
usersForProgram   = get_users_for_events(eventsForProgram)
eventsSeenByUsers = get_event_ids_for_users(usersForProgram)

perfTimer.time(restart=True)
otherEvents       = remove_event_ids_of_program(eventsSeenByUsers, eventsForProgram)
perfTimer.time("TIMING: get events for users")

print ("From {} events, we're removing {} eventIds and stayed with {} events".format(
    eventsSeenByUsers.shape[0], len(eventsForProgram), otherEvents.shape[0])
)

eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
Total Activity of Users  417102
Total Distinct Events      5890 
             Count
EventId          
E/00922817      1
Count    1164
Name: E/00905067, dtype: int64
TIMING: get events for users            :   0.000s
From 5890 events, we're removing 2 eventIds and stayed with 5889 events


In [65]:

def join_with_events(eventIdAudience):
    return join_on_col(eventIdAudience, eventsRefData, 'EventId')

eventsForProgram         = get_events_for_program(programId)
usersForProgram          = get_users_for_events(eventsForProgram)
eventIdsSeenByUsers      = get_event_ids_for_users(usersForProgram)
otherEventIdsSeenByUsers = remove_event_ids_of_program(eventIdsSeenByUsers, eventsForProgram)

perfTimer.time(restart=True)
otherEventsSeenByUsers   = join_with_events(otherEventIdsSeenByUsers)
perfTimer.time("TIMING: get events for users")

print("X-Check", otherEventIdsSeenByUsers.shape[0], 'to', otherEventsSeenByUsers.shape[0])
print("Dataframe Check\n", otherEventsSeenByUsers.head(4))


eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
Total Activity of Users  417102
Total Distinct Events      5890 
             Count
EventId          
E/00922817      1
Count    1164
Name: E/00905067, dtype: int64
TIMING: get events for users            :   0.016s
X-Check 5889 to 5889
Dataframe Check
       EventId  Count ChannelId  ProgramId  Duration            StartDate              EndDate
0  E/00904866     57    C-0016  P_0008328        30  2017-01-01 23:45:00  2017-01-02 00:15:00
1  E/00904873    115    C-0031  P_0003056       180  2017-01-01 22:30:00  2017-01-02 01:30:00
2  E/00904872    130    C-0026  P_0009917       120  2017-01-01 23:15:00  2017-01-02 01:15:00
3  E/00904859    145    C-0008  P_0009235        30  2017-01-01 23:55:00  2017-01-02 00:25:00


In [87]:
def get_programs_for_users(events, limit=None):
    #join with events to get programId
    summary = events[['ProgramId', 'Count']]
    programs = summary.groupby('ProgramId', sort=False).sum()
    if limit:
        programs.sort_values(by="Count", ascending=False, inplace=True)
        programs = programs.head(limit)
    
    programs = pd.merge(
        left=programs,         left_index=True,
        right=programsRefData, right_on='ProgramId'
    )
    
    if not limit:
        programs.sort_values(by="Count", ascending=False, inplace=True)
        
    return programs

eventsForProgram         = get_events_for_program(programId)
usersForProgram          = get_users_for_events(eventsForProgram)
eventIdsSeenByUsers      = get_event_ids_for_users(usersForProgram)
otherEventIdsSeenByUsers = remove_event_ids_of_program(eventIdsSeenByUsers, eventsForProgram)
otherEventsSeenByUsers   = join_with_events(otherEventIdsSeenByUsers)

perfTimer.time(restart=True)
programsSeenByUsers      = get_programs_for_users(otherEventsSeenByUsers)
perfTimer.time("TIMING: get all programs for users")

print(programsSeenByUsers.shape[0], 'are all programs')

perfTimer.time(restart=True)
programsSeenByUsers      = get_programs_for_users(otherEventsSeenByUsers, limit=40)
perfTimer.time("TIMING: get some programs for users")

print(programsSeenByUsers.shape[0], 'are some programs')


eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
Total Activity of Users  417102
Total Distinct Events      5890 
             Count
EventId          
E/00922817      1
Count    1164
Name: E/00905067, dtype: int64
TIMING: get all programs for users      :   0.012s
4445 are all programs
TIMING: get some programs for users     :   0.011s
40 are some programs


In [93]:

%timeit get_programs_for_users(otherEventsSeenByUsers)


11.6 ms ± 47.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [94]:

%timeit get_programs_for_users(otherEventsSeenByUsers, limit=40)


7.26 ms ± 52.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [92]:
programs = get_programs_for_users(otherEventsSeenByUsers)
print(programs[['ProgramId','Count']].head(40)) #oops - needs sort

      ProgramId  Count
3644  P_0003646   1119
2593  P_0002595   1035
7399  P_0007401    954
5191  P_0005193    903
2225  P_0002227    860
...         ...    ...
889   P_0000891    679
2902  P_0002904    675
5025  P_0005027    675
832   P_0000834    669
7483  P_0007485    668

[40 rows x 2 columns]


In [91]:
programs = get_programs_for_users(otherEventsSeenByUsers, limit=40)
print(programs[['ProgramId','Count']].head(40))

      ProgramId  Count
3644  P_0003646   1119
2593  P_0002595   1035
7399  P_0007401    954
5191  P_0005193    903
2225  P_0002227    860
...         ...    ...
889   P_0000891    679
2902  P_0002904    675
5025  P_0005027    675
832   P_0000834    669
7483  P_0007485    668

[40 rows x 2 columns]


In [95]:

perfTimer.time(restart=True)

eventsForProgram         = get_events_for_program(programId)
usersForProgram          = get_users_for_events(eventsForProgram)
eventIdsSeenByUsers      = get_event_ids_for_users(usersForProgram)
otherEventIdsSeenByUsers = remove_event_ids_of_program(eventIdsSeenByUsers, eventsForProgram)
otherEventsSeenByUsers   = join_with_events(otherEventIdsSeenByUsers)
programsSeenByUsers      = get_programs_for_users(otherEventsSeenByUsers, limit=40)

perfTimer.time("TIMING: recommendation")


eventList <class 'list'> ['E/00905067', 'E/00915213']
get_users_for_events: 1164 changes
Total Activity of Users  417102
Total Distinct Events      5890 
             Count
EventId          
E/00922817      1
Count    1164
Name: E/00905067, dtype: int64
TIMING: recommendation                  :   0.247s
