In [1]:
import numpy as np
import pandas as pd
import datetime
import time

import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, IndexToString

import seaborn as sns
sns.set(style="darkgrid")
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [16,18]

In [2]:
spark = SparkSession.builder.appName("Ads").getOrCreate()
sqlContext = SQLContext(spark.sparkContext)

df = sqlContext.read.\
        parquet("/Users/theinfamouswayne/Documents/SametimeFileTransfers/coopsw_v2/Sample/cart_item_purchase/")

### <font color='light silver'>Faster toPandas()<font>

In [6]:
def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

## Using "sessionId" creating "Binary Product Basket"

In [7]:
product = df.select(['productEAN','sessionId']).distinct()
basket = product.groupby('sessionId').agg(fn.countDistinct('productEAN').alias('productBasket'))

#### <font color='light silver'>Label Encoding<font>

In [8]:
stringIndexer = StringIndexer(inputCol='productEAN', outputCol='indexEAN')
pipeline = Pipeline(stages=[stringIndexer])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(product)
dataset = pipelineFit.transform(product)

#### <font color='light silver'>Reverse Mapping<font> 

In [9]:
to_join = toPandas(df.select('productName','productEAN').distinct())
to_join = to_join.drop_duplicates('productEAN')
to_join.productEAN = to_join.productEAN.astype(int)
to_join = to_join.set_index('productEAN')
# to_join.info()
idx_to_ean = IndexToString(inputCol="indexEAN", outputCol="IndexValue")
idx_to_ean = idx_to_ean.transform(dataset).distinct()
idx_pd = toPandas(idx_to_ean.drop('sessionId','IndexValue').distinct()).drop_duplicates('productEAN')
idx_pd.productEAN = idx_pd.productEAN.astype(int)
idx_pd = idx_pd.set_index('productEAN')

# Index to Names mapping
idx_to_name = idx_pd.join(to_join,on='productEAN')
idx_to_name = idx_to_name.sort_values('indexEAN')

#### <font color='light silver'>Sparse Binary Vector<font>  

In [10]:
from pyspark.sql.functions import collect_list, max, lit, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import DoubleType

In [11]:
def encode(arr, length):
    vec_args =  length, [(x,1.0) for x in arr]
    return Vectors.sparse(*vec_args)
encode_udf = udf(encode, VectorUDT())

In [12]:
feats = dataset.agg(max(dataset["indexEAN"])).take(1)[0][0] + 1
basket_binary = dataset.groupby('sessionId').\
                        agg(collect_list('indexEAN').alias('arrayEAN')).\
                        select('sessionId',encode_udf('arrayEAN',lit(feats)).alias('Basket'))

## Calculating Pair-wise Correlation b/w Products

In [13]:
st = time.time()
basket_pd = toPandas(basket_binary)
time.time() - st

43.525646924972534

#### Converting to Numpy Matrix

In [14]:
series = basket_pd['Basket'].apply(lambda x : np.array(x.toArray())).values.reshape(-1,1)
features = np.apply_along_axis(lambda x : x[0], 1, series)

## <font color='dark blue'> "***********" TESTING "***********"</font>
### <font color='red'> Without Converting to Pandas</font>

#### <font color='Red'> SessionID to SessionIndex</font>

In [17]:
stringIndexer = StringIndexer(inputCol='sessionId', outputCol='indexSession')
pipeline = Pipeline(stages=[stringIndexer])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(basket_binary)
sparkBasket = pipelineFit.transform(basket_binary)

In [33]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
sparkFeatures = IndexedRowMatrix(sparkBasket.rdd.map(lambda row: IndexedRow(row[2],np.array(row[1].toArray()))))
# Row Format: [index = SessionIndex, column1 = array of binary basket]

In [None]:
st = time.time()
matrix = np.array(sparkFeatures.rows.collect())
time.time() - st

#### <font color='Red'> Directly Without SessionIndex</font>

In [None]:
st = time.time()
matrix2 = np.array(basket_binary.rdd.map(lambda x: np.array(x['Basket'].toArray())).collect())
time.time() - st

## <font color='dark blue'> "***********" OVER "***********"</font><br>
## Correlation (phi) from numpy matrix

<img src='https://i.stack.imgur.com/V9dfd.jpg'></img>

In [15]:
A = B = features[:,:]

In [16]:
# Get number of rows in either A or B
N = B.shape[0]

# Store columnw-wise in A and B, as they would be used at few places
sA = A.sum(0)
sB = B.sum(0)

# Basically there are four parts in the formula. We would compute them one-by-one
p1 = N*np.dot(B.T,A) #p1 = N*np.einsum('ij,ik->kj',A,B)
p2 = sA*sB[:,None]
p3 = N*((B**2).sum(0)) - (sB**2)
p4 = N*((A**2).sum(0)) - (sA**2)

# Finally compute Pearson Correlation Coefficient as 2D array 
phi = ((p1 - p2)/np.sqrt(p4*p3[:,None]))

# Get the element corresponding to absolute argmax along the columns 
out = phi[np.nanargmax(np.abs(phi),axis=0),np.arange(phi.shape[1])]

#### Pairwise Node-Distance Matrix

In [17]:
import math as m
def corrDist(ele):
    return m.sqrt(2*(1-ele))

uFunc = np.vectorize(corrDist) # For element-wise operation on ndarray phi
distance = uFunc(phi)

## 3D MST Graph

In [18]:
import networkx as nx
from plotly.offline import plot
import plotly.graph_objs as go
from networkx.drawing.nx_agraph import graphviz_layout

In [19]:
G = nx.from_numpy_matrix(distance)
nodes = list(G.nodes())
mst = nx.minimum_spanning_tree(G, algorithm='prim')
pos = nx.spring_layout(mst,dim=3)
N = len(nodes)
labels = [each.encode('utf-8') for each in list(idx_to_name['productName'])]

#### Node & Edge Coordinates

In [20]:
Xn=[pos[k][0] for k in range(N)]# x-coordinates of nodes
Yn=[pos[k][1] for k in range(N)]# y-coordinates
Zn=[pos[k][2] for k in range(N)]# z-coordinates
Xe=[]
Ye=[]
Ze=[]
Edges = list(mst.edges)
for e in Edges:
    Xe+=[pos[e[0]][0],pos[e[1]][0], None]# x-coordinates of edge ends
    Ye+=[pos[e[0]][1],pos[e[1]][1], None]
    Ze+=[pos[e[0]][2],pos[e[1]][2], None]

#### Average Linkage Clustering

In [21]:
from sklearn.cluster import AgglomerativeClustering
d = [[x,y,z] for x,y,z in zip(Xn,Yn,Zn)]
number_clusters = 20
agg = AgglomerativeClustering(n_clusters=number_clusters,linkage='average')
result = agg.fit_predict(d)

# Generating 'n' different colors. One for each cluster.
import random
colors = ["#%06x" % random.randint(0, 0xFFFFFF) for i in range(number_clusters)]
color_each_node = [colors[i] for i in result]

In [22]:
# For Edges
trace1=go.Scatter3d(x=Xe,
               y=Ye,
               z=Ze,
               mode='lines',
               line=dict(color='rgb(125,125,125)', width=1),
               hoverinfo='text'
               )

# For Nodes
trace2=go.Scatter3d(x=Xn,
               y=Yn,
               z=Zn,
               mode='markers',
               name='actors',
               marker=dict(symbol='circle',
                             size=6,
                             color=color_each_node, # Every node is given a different color
                             colorscale='Viridis',
                             line=dict(color='rgb(50,50,50)', width=0.5)
                             ),
               text=labels,
               hoverinfo='text'
               )

axis=dict(showbackground=False,
          showline=False,
          zeroline=False,
          showgrid=False,
          showticklabels=False,
          title=''
          )

In [23]:
layout = go.Layout(
            title="Product MST",
            width=1000,
            height=1000,
            showlegend=False,
            scene=dict(
                    xaxis=dict(axis),
                    yaxis=dict(axis),
                    zaxis=dict(axis),
                        ),
            margin=dict(t=100),
            hovermode='closest',
            annotations=[
                   dict(
                    showarrow=False,
                    text="Data source: Pair-wise Distance Matrix",
                    xref='paper',
                    yref='paper',
                    x=0,
                    y=0.1,
                    xanchor='left',
                    yanchor='bottom',
                    font=dict(size=14)
                        )
                        ],    
                    )

In [24]:
data=[trace1, trace2]
fig=go.Figure(data=data, layout=layout)
plot(fig, filename='./product_plotly.html', auto_open=True, show_link=False)

'file:///Users/theinfamouswayne/Python Notebooks/product_plotly.html'

## Comparing Trend of Similar Products

In [25]:
df_qpd = df.groupby(['Date','productName']).agg(fn.sum('productQuantity').alias('Total')).orderBy('Date').toPandas() 

In [26]:
p1 = df_qpd[df_qpd['productName']==u"CREME FRAICHE 34%"]
p2 = df_qpd[df_qpd['productName']==u"MELLANMJÖLK"]
p3 = df_qpd[df_qpd['productName']==u"SMÖR NORMALSALTAT"]

In [27]:
name = u"CREME FRAICHE 34%"
trace_p1 = go.Scatter(
                x=p1.Date,
                y=p1.Total,
                name = name.encode('utf-8'),
                line = dict(color = '#17BECF'),
                opacity = 0.8)

name = u"MELLANMJÖLK"
trace_p2 = go.Scatter(
                x=p2.Date,
                y=p2.Total,
                name = name.encode('utf-8'),
                line = dict(color = '#7F7F7F'),
                opacity = 0.8)

name = u"SMÖR NORMALSALTAT"
trace_p3 = go.Scatter(
                x=p3.Date,
                y=p3.Total,
                name = name.encode('utf-8'),
                line = dict(color = '#BBCCDD'),
                opacity = 0.8)

In [28]:
layout = dict(
    title='Time Series with Rangeslider',
    xaxis=dict(
        rangeselector=dict(
            buttons=list([
                dict(count=1,
                     label='1m',
                     step='month',
                     stepmode='backward'),
                dict(count=6,
                     label='6m',
                     step='month',
                     stepmode='backward'),
                dict(step='all')
            ])
        ),
        rangeslider=dict(
            visible = True
        ),
        type='date'
    )
)

In [29]:
data = [trace_p1, trace_p2, trace_p3]
fig = dict(data=data, layout=layout)
plot(fig, filename='./range_slider.html', auto_open=True, show_link=False)

'file:///Users/theinfamouswayne/Python Notebooks/range_slider.html'