In [1]:
!pip install google-cloud-bigquery
!pip install db-dtypes
!pip install tqdm
!pip install pyspark
!pip install pandas



In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import os
from pyspark.sql import Row
from tqdm import tqdm


os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"


print(os.environ["JAVA_HOME"])


spark = SparkSession.builder.appName('recommendation_ai').getOrCreate()


/usr/lib/jvm/java-11-openjdk-amd64


23/10/18 22:50:35 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 155.246.81.32 instead (on interface eno1)
23/10/18 22:50:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/18 22:50:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
likes_df = spark.read.csv('likes.csv', inferSchema=True, header=True)
posts_df = spark.read.csv('posts.csv', inferSchema=True, header=True)
profiles_df = spark.read.csv('profiles.csv', inferSchema=True, header=True)

posts_df.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/bryan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/bryan/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
[Stage 1:>                                                        (0 + 56) / 56]

KeyboardInterrupt: 

                                                                                

In [None]:
import concurrent.futures

def get_content(uri1, uri2):
    try:
        response = requests.get(uri1)
        return response.text
    except:
        try:
            response = requests.get(uri2)
            return response.text
        except:
            return ""
        
# Collect the unique URIs to a Python list
uri_pairs = posts_df.select('s3_metadata_location', 'content_uri').distinct().rdd.map(tuple).collect()


def get_content_pair(uri_pair):
    s3_metadata_location, content_uri = uri_pair
    content = get_content(content_uri, s3_metadata_location)
    return Row(s3_metadata_location=s3_metadata_location, content=content)

# Use a ThreadPoolExecutor to parallelize the requests
with concurrent.futures.ThreadPoolExecutor() as executor:
    rows = list(tqdm(executor.map(get_content_pair, uri_pairs), total=len(uri_pairs), desc="Loading content"))

# Create a DataFrame from the results
content_df = spark.createDataFrame(rows)

# Join the results back to the original DataFrame
posts_df = posts_df.join(content_df, on='s3_metadata_location', how='left')

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer



# Rename the columns to match the names expected by ALS
ratings_df = likes_df.selectExpr("actioned_by_profile_id as userId", "publication_id as postId", "reaction as rating")



# Create a StringIndexer
indexer = StringIndexer(inputCol="userId", outputCol="userIdIndex")

# Index the userId column
ratings_df = indexer.fit(ratings_df).transform(ratings_df)

# Do the same for the postId column
indexer.setInputCol("postId")
indexer.setOutputCol("postIdIndex")
ratings_df = indexer.fit(ratings_df).transform(ratings_df)



ratings_df = ratings_df.filter(ratings_df.userId.isNotNull())

# Map "UPVOTE" to 1 and "DOWNVOTE" to -1
ratings_df = ratings_df.withColumn("rating", when(col("rating") == "UPVOTE", 1).otherwise(-1))

ratings_df.show(5)


In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Assume you have a DataFrame `ratings_df` with columns 'userId', 'postId', and 'rating'
# Split the data into training and test sets
(training, test) = ratings_df.randomSplit([0.8, 0.2])
# Create an ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userIdIndex", itemCol="postIdIndex", ratingCol="rating",
          coldStartStrategy="drop")

# Train the ALS model
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

In [None]:
# Generate top 10 post recommendations for each user
userRecs = model.recommendForAllUsers(10)

userRecs.show()

# Convert the recommendations array to a string
userRecs = userRecs.withColumn("recommendations", col("recommendations").cast("string"))

# Save the DataFrame to a CSV file
userRecs.write.csv("user_recommendations.csv")