In [1]:
import os
import sys


# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
    .master("local[10]") \
    .appName("CAV") \
    .config("spark.executor.memory", "6G") \
    .config("spark.storage.memoryFraction", 0.2) \
    .config("spark.driver.memory", "16G") \
    .getOrCreate()
   
sc = spark.sparkContext

In [2]:
sc.getConf().getAll()

[('spark.storage.memoryFraction', '0.2'),
 ('spark.driver.memory', '16G'),
 ('spark.executor.memory', '6G'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '33541'),
 ('spark.driver.host', '172.31.5.36'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'CAV'),
 ('spark.master', 'local[10]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1529927576903')]

Old schema and files

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Visitor_ID", StringType(), True),
    StructField("Visit Number", IntegerType(), True),
    StructField("Products", IntegerType(), True),
    StructField("Product Views", IntegerType(), True)])

In [3]:
files = ['Report20171001-20180531.csv', 'Report20170101-20170930.csv']

df = spark.read.load(files[0], header=True, format="csv", schema=schema).cache();
for i in range(1, len(files)):
    df = df.union(spark.read.load(files[0], header=True, format="csv", schema=schema))

In [4]:
from pyspark.sql.functions import concat, col, lit

df = df.withColumn('session_id', concat(col("Visitor_ID"), lit("_"), col("Visit Number")))

In [8]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- session_id: string (nullable = true)



In [9]:
df.count()

430612176

In [10]:
df.show(5, False)

+---------------+----------+------------------------------------------+
|Date           |product_id|session_id                                |
+---------------+----------+------------------------------------------+
|October 1, 2017|171236    |1000046560612521877_444244119977433712_9  |
|October 1, 2017|38005     |1000046560612521877_444244119977433712_9  |
|October 1, 2017|39952     |1000046560612521877_444244119977433712_9  |
|October 1, 2017|40028     |1000046560612521877_444244119977433712_9  |
|October 1, 2017|27790     |1000102520911731070_1646240024588457894_40|
+---------------+----------+------------------------------------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import isnan
df.filter((df["Visitor_ID"] == "") | df["Visitor_ID"].isNull()).count(), df.filter((df["Visit Number"] == "") | df["Visit Number"].isNull() | isnan(df["Visit Number"])).count()

(54, 54)

In [None]:
df.count()

In [5]:
%%time
from pyspark.sql.functions import isnan
df = df.filter((df["Visitor_ID"] != "") & df["Visitor_ID"].isNotNull() & df["Visit Number"].isNotNull() & ~isnan(df["Visit Number"]))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 30.7 ms


In [10]:
df.select('Visitor_ID').distinct().count(), df.select('session_id').distinct().count()

(6369067, 99688677)

In [None]:
df.describe().show()

In [6]:
%%time
df = df.filter(df['Products'].isNotNull() & (df['Product Views'] > 0))
#df.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 13 ms


In [None]:
df.show(10, False)

In [7]:
df = df.drop('Visitor_ID', 'Date', 'Visit Number', 'Product Views')

In [None]:
df.show(10, False)

In [8]:
df = df.withColumnRenamed('Products', 'product_id')

### Preparing dictionary of simple to configurable products

In [9]:
import sys
sys.path.append("/nykaa/api")
from pas.v2.utils import Utils

es_conn = DiscUtils.esConn()

scroll_id = None
ES_BATCH_SIZE = 200

child_2_parent = {}
while True:
    if not scroll_id:
        query = {
            "query": {"match_all": {}},
            "size": ES_BATCH_SIZE,
            "_source": ["product_id", "parent_id"]
        }
        
        response = es_conn.search(index='livecore', body=query, scroll='5m')
    else:
        response = es_conn.scroll(scroll_id=scroll_id, scroll='5m')
    
    if not response['hits']['hits']:
        break
    scroll_id = response['_scroll_id']
    for hit in response['hits']['hits']:
        child_2_parent[int(hit['_source']['product_id'])] = int(hit['_source']['parent_id'])
        
if scroll_id:
    es_conn.clear_scroll(scroll_id=scroll_id)

In [None]:
print(str(child_2_parent.get(64059, 64059)))

In [None]:
df.printSchema()

In [10]:
%%time
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def convert_to_parent(product_id):
    return child_2_parent.get(product_id, product_id)

convert_to_parent_udf = udf(convert_to_parent, IntegerType())
df = df.withColumn("product_id", convert_to_parent_udf(df['product_id']))

CPU times: user 448 ms, sys: 8 ms, total: 456 ms
Wall time: 494 ms


In [None]:
df.show()

In [13]:
%%time
unique_products = [row['product_id'] for row in df.select('product_id').distinct().collect()]
len(unique_products)

CPU times: user 496 ms, sys: 36 ms, total: 532 ms
Wall time: 4min 5s


In [11]:
%%time
df = df.groupBy(['session_id', 'product_id']).agg({'product_id': 'count'})

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 35.1 ms


In [12]:
df = df.drop('count(product_id)')

In [13]:
product_to_sessions_count_df = df.groupBy('product_id').agg({'session_id': 'count'}).withColumnRenamed('count(session_id)', 'sessions_count').toPandas()
len(product_to_sessions_count_df)

  (fname, cnt))
  (fname, cnt))


76746

In [None]:
product_to_sessions_count_df.head()

In [14]:
%%time
product_to_sessions_count = dict(zip(product_to_sessions_count_df.product_id, product_to_sessions_count_df.sessions_count))

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12 ms


In [None]:
df.count()

In [None]:
df.show(5)

In [15]:
%%time
merged_df = df.withColumnRenamed('product_id', 'product_id_x').join(df.withColumnRenamed('product_id', 'product_id_y'), on="session_id", how="inner")
#merged_df.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 57.2 ms


In [None]:
merged_df.show(5)

In [16]:
merged_df = merged_df[merged_df.product_id_x < merged_df.product_id_y]

In [23]:
merged_df.count()

1182459229

In [17]:
merged_df = merged_df.groupBy(['product_id_x', 'product_id_y']).agg({'session_id': 'count'})

In [None]:
merged_df.show(5)

In [25]:
merged_df.count()

131875853

In [18]:
final_df = merged_df.withColumnRenamed("count(session_id)", 'sessions_intersection')

In [None]:
final_df.show(5)

In [None]:
final_df = final_df.cache()

In [19]:
del df
del merged_df

# Using Histogram for visualizing products that are removed from recommendations table when using different threshold

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

data = {}
for i in range(1, 5, 1):
    _df = final_df[final_df['sessions_intersection'] >= i]
    _unique_products_x = [row['product_id_x'] for row in _df.select('product_id_x').distinct().collect()]
    _unique_products_y = [row['product_id_y'] for row in _df.select('product_id_y').distinct().collect()]
    _unique_products = set(_unique_products_x + _unique_products_y)
    data[i] = len(_unique_products)

In [None]:
plt.figure(figsize=(20,10))
plt.xlabel('Threshold of customers common for products to be similar')
plt.ylabel('No of products having recommendations')
plt.bar(range(len(data)), data.values(), align='center')
plt.xticks(range(len(data)), list(data.keys()))

plt.show()

In [20]:
final_df = final_df[final_df['sessions_intersection'] >= 2]

In [29]:
final_df.count()

60078908

In [21]:
from pyspark.sql.functions import udf

def compute_union_len(product_id_x, product_id_y, sessions_intersection):
     return product_to_sessions_count[product_id_x] + product_to_sessions_count[product_id_y] - sessions_intersection

compute_union_len_udf = udf(compute_union_len, IntegerType())
final_df = final_df.withColumn("sessions_union", compute_union_len_udf(final_df['product_id_x'], final_df['product_id_y'], final_df['sessions_intersection']))

In [None]:
final_df.show(5)

In [22]:
from pyspark.sql.types import FloatType
def compute_similarity(sessions_intersection, sessions_union):
     return sessions_intersection/sessions_union

compute_similarity_udf = udf(compute_similarity, FloatType())
final_df = final_df.withColumn("similarity", compute_similarity_udf(final_df['sessions_intersection'], final_df['sessions_union']))

In [23]:
from pyspark.sql.types import FloatType
import math
def compute_log_similarity(sessions_intersection, sessions_union):
     return sessions_intersection/math.log(sessions_union)

compute_log_similarity_udf = udf(compute_log_similarity, FloatType())
final_df = final_df.withColumn("log_similarity", compute_log_similarity_udf(final_df['sessions_intersection'], final_df['sessions_union']))

In [24]:
from pyspark.sql.types import FloatType
import math
def compute_sqrt_similarity(sessions_intersection, sessions_union):
     return sessions_intersection/math.sqrt(sessions_union)

compute_sqrt_similarity_udf = udf(compute_sqrt_similarity, FloatType())
final_df = final_df.withColumn("sqrt_similarity", compute_sqrt_similarity_udf(final_df['sessions_intersection'], final_df['sessions_union']))

In [None]:
final_df.show(5)

# Getting Recommendations

Making dict of product to similar products

In [34]:
from collections import defaultdict
direct_similar_products = defaultdict(lambda: [])
log_similar_products = defaultdict(lambda: [])
sqrt_similar_products = defaultdict(lambda: [])
simple_similar_products = defaultdict(lambda: [])

while True:
    rows = sqlContext.sql("SELECT * FROM final_df LIMIT %d" % (product_id, product_id, order_by, recommendations_cnt)).collect()

for row in final_df.collect():
    direct_similar_products[row['product_id_x']].append((row['product_id_y'], row['similarity']))
    direct_similar_products[row['product_id_y']].append((row['product_id_x'], row['similarity']))
    
    log_similar_products[row['product_id_x']].append((row['product_id_y'], row['log_similarity']))
    log_similar_products[row['product_id_y']].append((row['product_id_x'], row['log_similarity']))
    
    sqrt_similar_products[row['product_id_x']].append((row['product_id_y'], row['sqrt_similarity']))
    sqrt_similar_products[row['product_id_y']].append((row['product_id_x'], row['sqrt_similarity']))
    
    simple_similar_products[row['product_id_x']].append((row['product_id_y'], row['sessions_intersection']))
    simple_similar_products[row['product_id_y']].append((row['product_id_x'], row['sessions_intersection']))

Py4JJavaError: An error occurred while calling o364.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 101 tasks (1030.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
final_df.createOrReplaceTempView("final_df")
sqlContext.sql("SELECT * FROM final_df WHERE product_id_x=%d OR product_id_y=%d ORDER BY similarity DESC LIMIT 20" % (61496, 61496)).collect()

In [None]:
from pyspark.sql import SQLContext

import sys
sys.path.append("/nykaa/api")
from pas.v2.utils import Utils

sqlContext = SQLContext(sc)
final_df.createOrReplaceTempView("final_df")

def get_recommendations(product_id, recommendations_cnt=20, with_name=True, variant='simple'):
    if variant == 'log':
        order_by = 'log_similarity'
    elif variant == 'sqrt':
        order_by = 'sqrt_similarity'
    elif variant == 'direct':
        order_by = 'similarity'
    else:
        order_by = 'sessions_intersection'
        
    rows = sqlContext.sql("SELECT * FROM final_df WHERE product_id_x=%d OR product_id_y=%d ORDER BY %s DESC LIMIT %d" % (product_id, product_id, order_by, recommendations_cnt)).collect()
    recommendation_ids = []
    for row in rows:
        if row['product_id_x'] == product_id:
            recommendation_ids.append(row['product_id_y'])
        else:
            recommendation_ids.append(row['product_id_x'])
    if not with_name:
        return recommendation_ids
    query = {
        "query": {
            "terms": {"product_id.keyword": recommendation_ids}
        },
        "_source": ['title_text_split', 'product_id']
    }
    
    response = DiscUtils.makeESRequest(query, index='livecore')
    product_id_2_name = {int(hit['_source']['product_id']): hit['_source']['title_text_split'] for hit in response['hits']['hits']}
    return [(recommendation_id, product_id_2_name.get(recommendation_id)) for recommendation_id in recommendation_ids]

In [None]:
product_id = 288288
query = {
        "query": {
            "term": {"product_id": product_id}
        },
        "_source": ['title_text_split', 'product_id']
    }
    
response = DiscUtils.makeESRequest(query, index='livecore')
print ("Product Name: %s" % response["hits"]["hits"][0]['_source']['title_text_split'])
get_recommendations(288288, 30)

In [None]:
product_id_2_name[288289]

In [None]:
from joblib import Parallel, delayed

pasdb = DiscUtils.mysqlConnection()
cursor = pasdb.cursor()
algo = 'coccurence'

create_recommendations_table_query = """ CREATE TABLE IF NOT EXISTS recommendations (
                            entity_id INT UNSIGNED NOT NULL, 
                            entity_type VARCHAR(50),
                            recommendation_type VARCHAR(50),
                            algo VARCHAR(50),
                            variant VARCHAR(50),
                            recommended_products_json JSON,
                            PRIMARY KEY (entity_id, entity_type, recommendation_type, algo, variant)
                            )
"""
cursor.execute(create_recommendations_table_query)

def add_recommendations_for_products(product_ids):
    rows = []
    for variant in ['simple', 'direct', 'log', 'sqrt']:
        for product_id in product_ids:
            recommendations = get_recommendations(product_id, 20, False, variant)
            if not recommendations:
                continue
            recommendations_str = str(recommendations)
            rows.append((product_id, 'product', 'viewed', algo, variant, recommendations_str))
    
    if not rows:
        return
    values_str = ", ".join(["(%s, %s, %s, %s, %s, %s)" for i in range(len(rows))])
    values = tuple([_i for row in rows for _i in row])
    insert_recommendations_query = """ INSERT INTO recommendations(entity_id, entity_type, recommendation_type, algo, variant, recommended_products_json)
                VALUES %s ON DUPLICATE KEY UPDATE recommended_products_json=VALUES(recommended_products_json)
    """ % values_str
    values = tuple([str(_i) for row in rows for _i in row])
    cursor.execute(insert_recommendations_query, values, multi=True)
    pasdb.commit()

product_id_chunks = []

offset, SLICE_SIZE = 0, 10

while offset < len(unique_products):
    last_idx = offset + SLICE_SIZE if offset + SLICE_SIZE < len(unique_products) else len(unique_products)
    product_id_chunks.append(unique_products[offset:last_idx])
    offset = last_idx

Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs', backend="threading")(delayed(add_recommendations_for_products)(data) for data in product_id_chunks)

In [None]:
final_df.head(2)

In [42]:
final_df_pandas = final_df.toPandas()

Py4JJavaError: An error occurred while calling o364.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 101 tasks (1030.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [45]:
final_df.select('product_id_x').max()

AttributeError: 'DataFrame' object has no attribute 'max'

In [46]:
from pyspark.sql.functions import monotonically_increasing_id
final_df = final_df.withColumn('index', monotonically_increasing_id())

In [47]:
final_df.agg({'product_id_x': 'max'}).collect()[0]

Row(max(product_id_x)=311405)

In [49]:
final_df.agg({'index': 'max'}).collect()[0]

Row(max(index)=1709397283980)

In [None]:
%%time
from pyspark.sql import SQLContext
from collections import defaultdict

direct_similar_products = defaultdict(lambda: [])
log_similar_products = defaultdict(lambda: [])
sqrt_similar_products = defaultdict(lambda: [])
simple_similar_products = defaultdict(lambda: [])

max_id = 311405
sqlContext = SQLContext(sc)
final_df.createOrReplaceTempView("final_df")
offset, BATCH_SIZE = 0, 10000
while offset <= max_id:
    last_idx = (offset + BATCH_SIZE) if (offset + BATCH_SIZE) <= max_id else max_id 
    rows = sqlContext.sql("SELECT * FROM final_df WHERE product_id_x <= %d AND product_id_x > %d" % (last_idx, offset)).collect()
    offset = last_idx
    for row in rows:
        direct_similar_products[row['product_id_x']].append((row['product_id_y'], row['similarity']))
        direct_similar_products[row['product_id_y']].append((row['product_id_x'], row['similarity']))

        log_similar_products[row['product_id_x']].append((row['product_id_y'], row['log_similarity']))
        log_similar_products[row['product_id_y']].append((row['product_id_x'], row['log_similarity']))

        sqrt_similar_products[row['product_id_x']].append((row['product_id_y'], row['sqrt_similarity']))
        sqrt_similar_products[row['product_id_y']].append((row['product_id_x'], row['sqrt_similarity']))

        simple_similar_products[row['product_id_x']].append((row['product_id_y'], row['sessions_intersection']))
        simple_similar_products[row['product_id_y']].append((row['product_id_x'], row['sessions_intersection']))