## Recommendation System
Collaborative filtering with implicit feedback based on latent factors. Prepare data on user-item relationships for each user-company in format that ALS can use.
We require each unique assignee ID in the rows of the matrix, and each unique item ID in columns of matrix.
Values of matrix should be (?) binary user-item preference * confidence

In [62]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.mllib.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from lightfm import LightFM
from lightfm.datasets import fetch_movielens
from lightfm.evaluation import precision_at_k

import pandas as pd
import numpy as np

from test_model import (get_patent_fields_list, get_ml_patents, 
                        create_title_abstract_col,trim_data, 
                        structure_dataframe, partition_dataframe, 
                        build_pipeline, process_docs, pat_inv_map, get_topics)
import gensim
import gensim.corpora as corpora
from gensim.corpora import Dictionary, mmcorpus
from gensim.utils import simple_preprocess
from gensim.models import CoherenceModel
from gensim.models.phrases import Phrases, Phraser
from gensim.models.ldamodel import LdaModel
from gensim.models import AuthorTopicModel
from gensim.test.utils import common_dictionary, datapath, temporary_file
from smart_open import smart_open

import spacy
from spacy.lemmatizer import Lemmatizer
from spacy.lang.en import LEMMA_INDEX, LEMMA_EXC, LEMMA_RULES

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, punkt, RegexpTokenizer, wordpunct_tokenize
from nltk.stem import PorterStemmer, LancasterStemmer, WordNetLemmatizer

import json
from pandas.io.json import json_normalize
import requests
import re
import os
import calendar
import requests
from bs4 import BeautifulSoup
import pickle

import matplotlib.pyplot as plt
import pyLDAvis
import pyLDAvis.gensim

from pprint import pprint

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [63]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

In [64]:
sc = spark.sparkContext
sc

### Data understanding - Acquire data

In [123]:
# load pickled dataset
with open('/Users/lee/Documents/techniche/techniche/data/raw_data_1000', 'rb') as f:
    raw_data_1000 = pickle.load(f)

In [124]:
# define desired keys/columns as criteria to subset dataset
retained_keys = ['patent_number', 'patent_firstnamed_assignee_id']

In [125]:
# subset raw dataset by desired keys/columns
data_1000 = trim_data(data=raw_data_1000, keys=retained_keys)

In [126]:
df_1000 = pd.DataFrame(data_1000)

In [127]:
df_1000

Unnamed: 0,patent_firstnamed_assignee_id,patent_number
0,org_VU2IXnxgxGIK8A8oQrwm,10226194
1,org_9cmRc2rH8nbl8O9VuxYL,10228278
2,org_8O8xQifxyiW5pZB2KuDx,10228693
3,org_q9Bn28RHhpYrQjKvraAH,10228922
4,org_EilEWQcC6UiqHcSGx9mb,10228931
5,org_ID497r4tFbCIaMBjGAST,10229106
6,org_p6ofWD2xFNSnyYkj6wpA,10229109
7,org_Vbc6obpnxWM42d0HjlXY,10229113
8,org_uSkGGmX0kIBgxQmYxLGK,10229148
9,org_q9Bn28RHhpYrQjKvraAH,10229156


In [128]:
df_1000['rating'] = 1

In [130]:
type(df_1000)

pandas.core.frame.DataFrame

In [131]:
df_1000.loc[df_1000['patent_number'] == 'RE46902']

Unnamed: 0,patent_firstnamed_assignee_id,patent_number,rating
717,org_HzcdORNhDFJGkIpXyffr,RE46902,1


In [113]:
df_1000 = df_1000.dropna()

In [134]:
df_1000 = df_1000.drop(df_1000[df_1000['patent_number'].str.contains('[RE]')])
print (df_1000)

KeyError: "labels ['patent_firstnamed_assignee_id' 'patent_number' 'rating'] not contained in axis"

In [132]:
df_1000.drop(df_1000.loc[df_1000['patent_number'] == 'RE46902'], inplace=True)

KeyError: "labels ['patent_firstnamed_assignee_id' 'patent_number' 'rating'] not contained in axis"

In [114]:
df_1000.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 973 entries, 0 to 999
Data columns (total 3 columns):
patent_firstnamed_assignee_id    973 non-null object
patent_number                    973 non-null object
rating                           973 non-null int64
dtypes: int64(1), object(2)
memory usage: 30.4+ KB


In [120]:
df_1000 = df_1000.astype({'patent_number': 'int64'}).dtypes

ValueError: invalid literal for int() with base 10: 'RE46902'

In [None]:
({'col1': 'int32'})

In [107]:
# s = 'org_VU2IXnxgxGIK8A8oQrwm'

In [108]:
# code = [ord(c) for c in s]
# code

In [101]:
df_1000.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 3 columns):
patent_firstnamed_assignee_id    973 non-null object
patent_number                    1000 non-null object
rating                           1000 non-null int64
dtypes: int64(1), object(2)
memory usage: 23.5+ KB


In [None]:
#df_1000 = df_1000['patent_number'].to_numeric()

In [83]:
df_1000 = df_1000.apply(pd.to_numeric)

ValueError: ('Unable to parse string "org_VU2IXnxgxGIK8A8oQrwm" at position 0', 'occurred at index patent_firstnamed_assignee_id')

In [None]:
s = "4525ABT2"
table = str.maketrans('ABCDEFGHIJKLMNOPQRSTUVWXYZ',
                          '22233344455566677778889999')
s.translate(table)
'45252282'

In [71]:
sp_df_1000 = spark.createDataFrame(df_1000)

In [72]:
sp_df_1000.show()

+-----------------------------+-------------+------+
|patent_firstnamed_assignee_id|patent_number|rating|
+-----------------------------+-------------+------+
|         org_VU2IXnxgxGIK8...|     10226194|     1|
|         org_9cmRc2rH8nbl8...|     10228278|     1|
|         org_8O8xQifxyiW5p...|     10228693|     1|
|         org_q9Bn28RHhpYrQ...|     10228922|     1|
|         org_EilEWQcC6UiqH...|     10228931|     1|
|         org_ID497r4tFbCIa...|     10229106|     1|
|         org_p6ofWD2xFNSny...|     10229109|     1|
|         org_Vbc6obpnxWM42...|     10229113|     1|
|         org_uSkGGmX0kIBgx...|     10229148|     1|
|         org_q9Bn28RHhpYrQ...|     10229156|     1|
|         org_p6ofWD2xFNSny...|     10229173|     1|
|         org_q9Bn28RHhpYrQ...|     10229187|     1|
|         org_q9Bn28RHhpYrQ...|     10229189|     1|
|         org_iwO2oOJ6VIBd9...|     10229357|     1|
|         org_q9Bn28RHhpYrQ...|     10229368|     1|
|         org_9D8x1qL3IRASp...|     10229673| 

In [81]:
sp_df_1000.dtypes

[('patent_firstnamed_assignee_id', 'string'),
 ('patent_number', 'string'),
 ('rating', 'bigint')]

In [73]:
sp_df_1000.limit(5).toPandas()

Unnamed: 0,patent_firstnamed_assignee_id,patent_number,rating
0,org_VU2IXnxgxGIK8A8oQrwm,10226194,1
1,org_9cmRc2rH8nbl8O9VuxYL,10228278,1
2,org_8O8xQifxyiW5pZB2KuDx,10228693,1
3,org_q9Bn28RHhpYrQjKvraAH,10228922,1
4,org_EilEWQcC6UiqHcSGx9mb,10228931,1


In [21]:
# TODO (Lee) - from topic_model - convert dataframe from subsetted dict, organize columns and sort by patent_date
# df_1000 = structure_dataframe(data=data_1000)

In [None]:
# partition df_1000 into train and test dataframes
# data_train_1000, data_test_1000 = partition_dataframe(df_1000, .8)

In [None]:
df = spark.read.csv(data_1000)

In [33]:
df = spark.read.load("/Users/lee/Documents/techniche/techniche/data/raw_data_1000",
                     format="json")

Py4JJavaError: An error occurred while calling o117459.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 13, localhost, executor driver): java.io.CharConversionException: Invalid UTF-32 character 0x3031392d(above 10ffff)  at char #1, byte #7)
	at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
	at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2017)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:577)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:56)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:55)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:55)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:53)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
	at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
	at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
	at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:98)
	at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x3031392d(above 10ffff)  at char #1, byte #7)
	at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
	at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2017)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:577)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:56)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:55)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:55)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:53)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
	at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
	at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
	at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
df.persist()

In [None]:
df.limit(5).toPandas()

### Data preparation
Prepare data on user-item relationships for each user-company in format that ALS can use.
We require each unique assignee ID in the rows of the matrix, and each unique item ID in columns of matrix.
Values of matrix should be (?) binary user-item preference * confidence

In [None]:
latent_ratings = 

In [None]:
# Columns are: assigneeId, itemId, (transaction x confidence)

In [None]:
#### Partition data into training and test sets
(training, test) = latent_ratings.randomSplit([0.8, 0.2])

### Model # 1

In [12]:
# set implicitPrefs to True to get better results b/c latent_ratings matrix 
# derived from another source of information (i.e. it is inferred from other signals), 

In [79]:
# split into 
(training, test) = sp_df_1000.randomSplit([0.8, 0.2])

In [80]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5,rank=4, 
          regParam=0.01, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop")

TypeError: object() takes no parameters

In [None]:
# fit the ALS model to the training set
model = als.fit(training)

In [None]:
# build recommendation model using ALS
rank = 10
numIterations = 10
alpha=

# als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
#           userCol="userId", itemCol="movieId", ratingCol="rating")

In [None]:
# build recommendation model using ALS based on implicit ratings
als = ALS(maxIter=5,
          rank=4, 
          regParam=0.01, 
          userCol="patent_firstnamed_assignee_id", 
          itemCol="patent_number", 
          ratingCol="new",
          coldStartStrategy="True")

In [75]:
# second example
model = ALS.trainImplicit(sp_df_1000, numIterations, alpha=0.01)

NameError: name 'numIterations' is not defined

#### Model #1 - Evaluation - Compare to naive baseline
Compare model evaluation result with naive baseline model that only outputs (for explicit - the average rating (or you may try one that outputs the average rating per movie).

#### Model #1 - Optimize model

In [76]:
als_model =  ALS(userCol="patent_firstnamed_assignee_id",
                 itemCol="patent_number",
                 ratingCol="rating", 
                 coldStartStrategy="drop")             

TypeError: object() takes no parameters

In [78]:
als = ALS(maxIter=5, regParam=0.01, 
          userCol="patent_firstnamed_assignee_id", 
          itemCol="patent_number", 
          ratingCol="rating")

TypeError: object() takes no parameters

In [None]:
model = als.fit(sp_df_1000)

In [None]:
params = ParamGridBuilder().addGrid(als_model.regParam, [0.01,0.001,0.1]).addGrid(als_model.rank, [4,10,50]).build()


## instantiate crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)
best_model = cv.fit(movie_ratings)    

In [None]:
# Getting Predictions for a New User

In [None]:
predictions = model.transform(test)