In [None]:
from pyspark.sql import SparkSession
# Initialize PySpark session
spark = SparkSession.builder \
    .appName("JupyterHub PySpark Example") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate() 

# /!\ Tout se fera à partir de cet object magique `spark`

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType

In [None]:
df_beers = spark.read.csv("/datasets/csv/beers.csv", header=True)
df_breweries = spark.read.csv("/datasets/csv/breweries.csv", header=True)

In [None]:
@F.udf(returnType=FloatType())
def safe_cast_to_float(str_float: str):
    return float(str_float)

df_beers_brewers = (
    df_beers
    .join(df_breweries.withColumnRenamed("name", "brewer_name"), on=df_beers.brewery_id == df_breweries.id)
).cache()

# UC-1

In [None]:
%%time 
n_beers = df_beers.count()
print(f"Q1: {n_beers} dans la DB")

In [None]:
%%time
print("Q2")
dd = (df_beers
      .join(df_breweries, on=df_beers.brewery_id == df_breweries.id)
      .groupby("country")
      .count()
      .sort(F.col("count").desc())
      .limit(10)
)
dd.show()

In [None]:
%%time
# Q3

@F.udf(returnType=FloatType())
def safe_cast_to_float(str_float: str):
    return float(str_float)

df_beers_brewers = (
    df_beers
    .join(df_breweries.withColumnRenamed("name", "brewer_name"), on=df_beers.brewery_id == df_breweries.id)
).cache()

print("Q3")
dd = (df_beers_brewers
      .filter(F.col("country") == F.lit("France"))
      .withColumn("abv_float", safe_cast_to_float(F.col("abv")))
      .sort(F.col("abv_float").desc())
      .select(["name", "abv_float", "country"])
      .limit(10)
)
dd.show()

In [None]:
%%time
print("Q4")
df_style = spark.read.csv("/datasets/csv/styles.csv", header=True)
target_style_id = df_style.filter(F.lower(F.col("style_name")) == "porter").select(F.col("id").alias("style_id"))
dd = (
    df_beers_brewers
    .join(target_style_id, how="inner", on="style_id")
    .withColumn("abv_float", safe_cast_to_float(F.col("abv")))
    .select(["name", "brewer_name", "abv_float", "country"])
    .groupby("country")
    .agg(F.avg("abv_float").alias("avg_abv"), F.countDistinct("brewer_name").alias("n_brewer_having_porter"))
    .show()
)

In [None]:
%%time
print("Q5")
dd = (
    df_beers_brewers
    .groupby("country")
    .count()
    .agg(F.median("count"))
)
print("Q5:", dd.first()[0])

# UC-2
Exo difficile

In [None]:
# output schema : https://stackoverflow.com/a/54771215/10716281

from pyspark.sql.types import *

mapping = {
    "float64": DoubleType,
    "object":StringType,
    "int64":IntegerType,
    "int32":IntegerType,
    "bool": BooleanType,
} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

In [None]:
def compute_cascade_model_per_user_query(df: pd.DataFrame) -> pd.DataFrame:
    pos_of_clicked_id = df[df["id_in_serp"] == df["clicked_id"]].iloc[0]["pos_in_serp"]
    df["seen"] = (df["pos_in_serp"] <= pos_of_clicked_id).astype(int)
    df["clicked"] = np.where(df["pos_in_serp"] == pos_of_clicked_id, 1, 0)
    return df

df_processed = compute_cascade_model_per_user_query(df_pref.limit(3).toPandas())
schema = createUDFSchemaFromPandas(df_processed)

In [None]:
(
    df_pref
    .repartition(16, "query", "user_id")
    .groupby(["query", "user_id"])
    .applyInPandas(compute_cascade_model_per_user_query, schema)
    .filter(F.col("seen") == F.lit(1))
    .groupby(["query", "id_in_serp"])
    .agg(F.sum("seen").alias("n_seen"), F.sum("clicked").alias("n_clicked"))
    .withColumn("clic_proba", F.col("n_clicked") / F.col("n_seen"))
    .select(["query", "id_in_serp", "clic_proba"])
    .show()
)

# UC-3 search from a query

**Observation :** On ne pourra pas aller bien loin en terme de souplesse dans la requête

In [None]:
df_description = (
    df_beers
    .select(["id", "name", "descript", "brewery_id"])
    .withColumn("beer_text", F.coalesce(F.col("descript"), F.lit("")))
    .withColumnRenamed("name", "beer_name")
    .join(
        (
            df_breweries
            .select(["id", "name", "descript"])
            .withColumn("brewer_text", F.coalesce(F.col("descript"), F.lit("")))
            .withColumnRenamed("name", "brewer_name")
        ), 
        on=df_beers.brewery_id == df_breweries.id
    )
    .withColumn("text", F.concat(F.col("beer_text"), F.lit(". "), F.col("brewer_text")))
    .select(["beer_name", "brewer_name", "text"])
).cache()

In [None]:
query = "stout"
df_description.filter(F.col("text").contains(query)).toPandas()

# UC-4 vectorize items

In [None]:
import requests
from typing import List
import numpy as np

class JinaEmbedder:
    
    URL = 'https://api.jina.ai/v1/embeddings'
    EMBEDDING_NAME = "jina-embeddings-v2-base-en"
    bearer_token = 'Bearer jina_85ba1ab9e5ff4017b3d216ebb8734f27xzJ9WyoYBFwqks9lOaNLHryw_Yyz'

    @staticmethod
    def http_json_to_vec(http_json: dict):
        return np.array(
            [
                sentence["embedding"]
                for sentence in http_json["data"]
            ]
        )        

    @classmethod
    def embed(cls, str_to_vectorize: List[str] | str) -> np.ndarray:
        if isinstance(str_to_vectorize, str):
            str_to_vectorize = [str_to_vectorize]
        headers = {
            'Content-Type': 'application/json',
            'Authorization': cls.bearer_token
        }
        data = {
            'model': cls.EMBEDDING_NAME,
            'normalized': True,
            'embedding_type': 'float',
            'input': str_to_vectorize
        }
        
        response = requests.post(cls.URL, headers=headers, json=data)

        if response.status_code != 200:
            return None

        return JinaEmbedder.http_json_to_vec(response.json())


In [None]:
@F.udf(StringType())
def craft_to_txt_vectorize(beer_name, brewer_name, beer_text, brewer_text, abv, ibu, srm):
    beer_name = beer_name if beer_name else ""
    brewer_name = brewer_name if brewer_name else ""
    beer_text = beer_text if beer_text else ""
    brewer_text = brewer_text if brewer_text else ""
    abv = abv if abv else ""
    ibu = ibu if ibu else ""
    srm = srm if srm else ""
    return f"The rewery {brewer_name} ({brewer_text}) brews {beer_name} which is described as {beer_text}. Spec of the beer is {abv=}, {ibu=}, {srm=}"

In [None]:
df_description = (
    df_beers.alias("beer")
    .withColumn("beer_text", F.coalesce(F.col("descript"), F.lit("")))
    .withColumnRenamed("name", "beer_name")
    .join(
        (
            df_breweries
            .select(["id", "name", "descript"])
            .withColumn("brewer_text", F.coalesce(F.col("descript"), F.lit("")))
            .withColumnRenamed("name", "brewer_name")
        ), 
        on=df_beers.brewery_id == df_breweries.id
    )
    .select(["beer.id", "beer_name", "brewer_name", "beer_text", "brewer_text", "abv", "ibu", "srm"])
).cache()

df_vec = (
    df_description
    .withColumn("to_vec", craft_to_txt_vectorize("beer_name", "brewer_name", "beer_text", "brewer_text", "abv", "ibu", "srm"))
    .repartition(2)
    .select("beer.id", "to_vec")
)

In [None]:
def my_func(iterator):
    ids = []
    texts = []
    for row in iterator:
        ids.append(row.id)
        texts.append(row.to_vec)
    embeddings = JinaEmbedder.embed(texts)
    for _id, embedding in zip(ids, embeddings):
        yield _id, embedding.tolist()

schema = StructType([
    StructField("id", StringType(), True),
    StructField("embedding", ArrayType(FloatType()), True)
])

In [None]:
vectorized_df = (
    df_vec
    .filter(F.col("id") % F.lit(12*12) == 3)
    .rdd.mapPartitions(my_func)
    .toDF(schema).cache()
    )
vectorized_df.toPandas()

**Remarque:** on aurait pu ne pas `mapPartition` ou ne pas passer par la fontion tampon `my_func` MAIS on n'aurait pas appelé Jina en batch => cela aurait tué les perfs

# UC-5 : answer question in corpa
Pas vraiment de possibilité native en SQL

In [None]:
queries = ["very bitter beer with smoky taste", "fruity sour - balanced sourness", "weird beer"]

In [None]:
from pyspark.mllib.feature import HashingTF, IDF

In [None]:
df_corpus = (
    df_description
    .withColumn("to_vec", craft_to_txt_vectorize("beer_name", "brewer_name", "beer_text", "brewer_text", "abv", "ibu", "srm"))
    .repartition(2)
    .select("id", "to_vec")
)
df_corpus.limit(2).show()

[Stage 53:>                                                         (0 + 1) / 1]

+---+--------------------+
| id|              to_vec|
+---+--------------------+
|100|The rewery Nebras...|
|529|The rewery Brasse...|
+---+--------------------+



                                                                                

In [None]:
from pyspark.ml.feature import HashingTF, IDF

def create_tf_and_idf_from_corpus(df_corpus):
    tf = HashingTF(inputCol="tokenized", outputCol="raw_features")
    df = tf.transform(df_corpus.withColumn("tokenized", F.split("to_vec", " ")))
    idf = IDF(inputCol="raw_features", outputCol="features").fit(df)
    return tf, idf

def turn_to_tf_idf(df_docs, tf, idf):
    docs_tf = tf.transform(df_docs.withColumn("tokenized", F.split("to_vec", " ")))
    docs_tfidf = idf.transform(docs_tf)
    return docs_tfidf

In [None]:
# Pre compute TF and IDF
tf, idf = create_tf_and_idf_from_corpus(df_corpus)

                                                                                

In [None]:
# apply on corpus and queries
corpus_tfidf = turn_to_tf_idf(df_corpus, tf, idf)

queries_df = spark.createDataFrame(pd.DataFrame(data={"to_vec": queries}))
queries_tfidf = turn_to_tf_idf(queries_df, tf, idf)

In [None]:
# broadcast queries to every partitions
broadcast_queries_tfidf = spark.sparkContext.broadcast(queries_tfidf.select(["features", "tokenized"]).collect())

24/10/14 08:01:40 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
                                                                                

In [None]:

# Function to compute dot product
def compute_dot_product(doc_vec, query_vec):
    #if isinstance(doc_vec, SparseVector):
    doc_vec = doc_vec.toArray()
    #if isinstance(query_vec, SparseVector):
    query_vec = query_vec.toArray()
    return float(doc_vec.dot(query_vec))

# Register UDF for dot product
dot_product_udf = F.udf(compute_dot_product, DoubleType())

# Explode the broadcasted queries_df into rows
queries_rdd = spark.sparkContext.parallelize(broadcast_queries_tfidf.value)
queries_df_expanded = spark.createDataFrame(queries_rdd)

# Cross join docs_df with expanded queries_df
cross_joined_df = corpus_tfidf.alias("corpus").crossJoin(queries_df_expanded.alias("queries"))

                                                                                

In [None]:
# Apply the dot product UDF
result_df = cross_joined_df.withColumn(
    "dot_product",
    dot_product_udf(cross_joined_df["corpus.features"], cross_joined_df["queries.features"])
).cache()

# Show result
result_df.show()

24/10/14 08:01:53 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
24/10/14 08:01:55 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB

+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|  id|              to_vec|           tokenized|        raw_features|            features|            features|           tokenized|       dot_product|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
| 100|The rewery Nebras...|[The, rewery, Neb...|(262144,[7058,760...|(262144,[7058,760...|(262144,[66208,12...|[very, bitter, be...|               0.0|
| 529|The rewery Brasse...|[The, rewery, Bra...|(262144,[7606,812...|(262144,[7606,812...|(262144,[66208,12...|[very, bitter, be...|               0.0|
| 686|The rewery Elysia...|[The, rewery, Ely...|(262144,[7606,187...|(262144,[7606,187...|(262144,[66208,12...|[very, bitter, be...|               0.0|
| 674|The rewery Big Ti...|[The, rewery, Big...|(262144,[1641,462...|(262144,[1641,462..

24/10/14 08:02:20 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
                                                                                

In [None]:
result_df.sort("dot_product", ascending=False).limit(10).show()

24/10/14 08:02:20 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|  id|              to_vec|           tokenized|        raw_features|            features|            features|           tokenized|       dot_product|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|5518|The rewery Sabmil...|[The, rewery, Sab...|(262144,[7058,760...|(262144,[7058,760...|(262144,[66208,12...|[very, bitter, be...|105.79780515834159|
|3602|The rewery Boston...|[The, rewery, Bos...|(262144,[336,1578...|(262144,[336,1578...|(262144,[16704,27...|[fruity, sour, -,...| 83.91526396115003|
|5171|The rewery Roy Pi...|[The, rewery, Roy...|(262144,[991,1581...|(262144,[991,1581...|(262144,[66208,12...|[very, bitter, be...| 78.46171430817284|
| 439|The rewery New Be...|[The, rewery, New...|(262144,[535,3456...|(262144,[535,3456..