In [1]:
import pyspark as ps
import psycopg2
import os
import datetime
import multiprocessing
from pyspark.mllib.recommendation import ALS
from pyspark.sql.types import StructField, StructType, IntegerType
from pyspark.ml.recommendation import ALS

In [2]:
def get_deck_card_counts(schema):
    '''
    Gets the deck data needed for the Spark ALS model.

    INPUT:
        - schema: StructType object, schema for spark ratings df

    OUTPUT:
        - incomplete_ratings: Spark df, ratings for all deck-card combos found in real decks
    '''

    cursor.execute('SELECT deck_id, cardstorm_id, card_count FROM decks')

    incomplete_ratings = spark.createDataFrame(data=cursor.fetchall(),
                                               schema=schema)

    return incomplete_ratings

In [3]:
def get_unused_cardstorm_ids():
    '''
    Gets a list of all the unused cardstorm_ids from the db

    INPUT:
        NONE

    OUTPUT:
        - unused_ids: list of ints, all unused cardstorm_ids.
    '''

    cursor.execute('''SELECT cardstorm_id
                      FROM cards
                      WHERE cardstorm_id
                        NOT IN (SELECT DISTINCT cardstorm_id FROM decks)''')

    unused_ids = [_[0] for _ in cursor.fetchall()]

    return unused_ids

In [4]:
def fill_unused_cardstorm_ids(unused_ids):
    '''
    Creates fake date for all unused cardstorm_ids.

    INPUT:
        - unused_ids: list of ints, all cardstorm_ids that don't show up in a deck

    OUTPUT:
        - filler_data: list of tuples, deck_id - cardstorm_id - card_count for
                        each of the unused_ids. deck_id is '-1' to easily id them
    '''

    filler_data = []
    for unused_id in unused_ids:
        filler_data.append((-1, unused_id, 1))

    return filler_data

In [30]:
def upload_product_rdd(product_rdd):
    '''
    Adds the product features rdd from the spark ALS model to the db.

    INPUT:
        - product_rdd: Spark rdd, features pulled from fitted Spark ALS model

    OUTPUT:
        - success: bool, True if no problems were encountered.
    '''
    current_date = str(datetime.date.today())

    cursor.execute('SELECT MAX(run_id) FROM product_matrices')
    run_id = cursor.fetchone()[0]
    if run_id is None:
        run_id = 0
    run_id += 1

    for cardstorm_id, features in product_rdd.collect():
        query = '''INSERT INTO product_matrices (cardstorm_id, features, date, run_id)
                   VALUES (%s, %s, %s, %s)'''

        try:
            cursor.execute(query, vars=[cardstorm_id, features, current_date, run_id])
        except psycopg2.IntegrityError:
            return False
            continue

    return True

In [6]:
def make_recommender():
    '''
    Makes the recommender model! Gets deck data from the database, makes filler
    data for unused cards, uses Spark ALS to train a model of implicit ratings.
    Pulls out the product features matrix (often referred to as V) and
    uploads it to the database with the current data attached.

    INPUT:
        NONE

    OUTPUT:
        NONE

    Does everything

        Get deck data from db
        create schema for spark DF
        get unused cardstorm_ids
        create fake data for all unused cards
        make new spark df from unused cards
        merge dataframes
        create and train ALS implicit model
        get the product matrix
        upload df to db
    '''

    ratings_schema = StructType([StructField('deck_id', IntegerType()),
                                 StructField('cardstorm_id', IntegerType()),
                                 StructField('card_count', IntegerType())])

    incomplete_ratings = get_deck_card_counts(schema=ratings_schema)

    unused_ids = get_unused_cardstorm_ids()

    filler_data = fill_unused_cardstorm_ids(unused_ids)

    filler_ratings = spark.createDataFrame(data=filler_data,
                                           schema=ratings_schema)
    ratings_df = incomplete_ratings.union(filler_ratings)
    
    model = ALS(rank=30, implicitPrefs=True)
    
    model.fit()

#     model = ALS.trainImplicit(ratings=ratings_df, rank=30)

    product_rdd = model.productFeatures()

    upload_status = upload_product_rdd(product_rdd)

    if upload_status: conn.commit()

In [7]:
def main():
    global dbname, host, username, password, conn, cursor, spark
    dbname = os.environ['CARDSTORM_DB_DBNAME']
    host = os.environ['CARDSTORM_DB_HOST']
    username = os.environ['CARDSTORM_DB_USERNAME']
    password = os.environ['CARDSTORM_DB_PASSWORD']


    conn = psycopg2.connect('dbname={} host={} user={} password={}'.format(dbname, host, username, password))
    cursor = conn.cursor()

    spark = (ps.sql.SparkSession.builder
                   .master('local[{}]'.format(multiprocessing.cpu_count()))
                   .appName('cardstorm modeling')
                   .getOrCreate())

    make_recommender()

    conn.close()

In [8]:
global dbname, host, username, password, conn, cursor, spark
dbname = os.environ['CARDSTORM_DB_DBNAME']
host = os.environ['CARDSTORM_DB_HOST']
username = os.environ['CARDSTORM_DB_USERNAME']
password = os.environ['CARDSTORM_DB_PASSWORD']


conn = psycopg2.connect('dbname={} host={} user={} password={}'.format(dbname, host, username, password))
cursor = conn.cursor()

spark = (ps.sql.SparkSession.builder
               .master('local[{}]'.format(multiprocessing.cpu_count()))
               .appName('cardstorm modeling')
               .getOrCreate())

# make_recommender()

# conn.close()

In [9]:
ratings_schema = StructType([StructField('deck_id', IntegerType()),
                                 StructField('cardstorm_id', IntegerType()),
                                 StructField('card_count', IntegerType())])

In [11]:
incomplete_ratings = get_deck_card_counts(schema=ratings_schema)

In [12]:
unused_ids = get_unused_cardstorm_ids()

In [13]:
filler_data = fill_unused_cardstorm_ids(unused_ids)

In [14]:
filler_ratings = spark.createDataFrame(data=filler_data, schema=ratings_schema)
ratings_df = incomplete_ratings.union(filler_ratings)

In [18]:
model = ALS(rank=30, implicitPrefs=True, userCol='deck_id', itemCol='cardstorm_id', ratingCol='card_count')

In [21]:
fitted_model = model.fit(ratings_df)

In [None]:
#     model = ALS.trainImplicit(ratings=ratings_df, rank=30)

In [29]:
type(fitted_model.itemFactors)

pyspark.sql.dataframe.DataFrame

In [None]:
cursor.execute('SELECT ')

In [27]:
product_rdd = fitted_model.itemFactors

In [31]:
upload_status = upload_product_rdd(product_rdd)

In [32]:
if upload_status: conn.commit()