In [1]:
spark

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import RegexTokenizer
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_set, collect_list, lit, sum, udf, concat_ws, col, count, abs, date_format, max, \
    from_utc_timestamp, expr
from pyspark.sql.functions import explode, posexplode
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.window import Window

In [3]:
# Load
path = "/media/workspace/DeepFood/deep-reco-gym/output/trivago/dataset/all/train__size=-1.csv"



df   = spark.read.csv(path, header=True, inferSchema=True).limit(50000)
df   = df.withColumn("idx", F.monotonically_increasing_id())

In [4]:
df.limit(2).toPandas()

Unnamed: 0,user_id,session_id,timestamp,step,action_type,reference,platform,city,device,current_filters,impressions,prices,impressions_array,idx
0,00RL8Z82B2Z1,aff3928535f48,1541037460,1,search for poi,Newtown,AU,"Sydney, Australia",mobile,,,,,0
1,00RL8Z82B2Z1,aff3928535f48,1541037522,2,interaction item image,666856,AU,"Sydney, Australia",mobile,,,,,1


### Tokenize

In [5]:
tokenizer    = RegexTokenizer(inputCol="current_filters", outputCol="current_filters_tokens", pattern="\|")
df_tokenized = tokenizer.transform(df.fillna({'current_filters': ''}))

In [6]:
df_tokenized.select("current_filters_tokens").limit(2).toPandas()

Unnamed: 0,current_filters_tokens
0,[]
1,[]


In [7]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="current_filters_tokens", outputCol="list_current_filters", vocabSize=10)
cv_model = cv.fit(df_tokenized)
df_cv    = cv_model.transform(df_tokenized)

In [8]:
df_cv.select("list_current_filters").limit(2).toPandas()

Unnamed: 0,list_current_filters
0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [9]:
cv_model.vocabulary

['hotel',
 '5 star',
 'resort',
 'sort by price',
 '4 star',
 'motel',
 'hostal (es)',
 'focus on distance',
 '3 star',
 'best value']

## Array to Column

In [10]:
vocabulary_columns = ["current_filters_"+c.replace(" ", "_") for c in cv_model.vocabulary]
vocabulary_columns

['current_filters_hotel',
 'current_filters_5_star',
 'current_filters_resort',
 'current_filters_sort_by_price',
 'current_filters_4_star',
 'current_filters_motel',
 'current_filters_hostal_(es)',
 'current_filters_focus_on_distance',
 'current_filters_3_star',
 'current_filters_best_value']

In [11]:
from pyspark.ml.linalg import Vectors


In [12]:
def extract(row):
    return (row.idx, ) + tuple(row.list_current_filters.toArray().tolist())

df_cv_2 = df_cv.rdd.map(extract).toDF(["idx"] + cv_model.vocabulary)  # Vector values will be named _2, _3, ...

In [13]:
df_cv_2.limit(2).toPandas()

Unnamed: 0,idx,hotel,5 star,resort,sort by price,4 star,motel,hostal (es),focus on distance,3 star,best value
0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [14]:
df_cv_2.filter(df_cv_2.hotel == 1).show(2)

+---+-----+------+------+-------------+------+-----+-----------+-----------------+------+----------+
|idx|hotel|5 star|resort|sort by price|4 star|motel|hostal (es)|focus on distance|3 star|best value|
+---+-----+------+------+-------------+------+-----+-----------+-----------------+------+----------+
|126|  1.0|   1.0|   1.0|          0.0|   1.0|  1.0|        1.0|              0.0|   0.0|       0.0|
|127|  1.0|   1.0|   1.0|          0.0|   1.0|  1.0|        1.0|              0.0|   0.0|       0.0|
+---+-----+------+------+-------------+------+-----+-----------+-----------------+------+----------+
only showing top 2 rows



## Join

In [16]:
df = df_cv.join(df_cv_2, df_cv.idx == df_cv_2.idx)

In [41]:
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.linalg import DenseVector

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array
sparse_to_array_udf = udf(sparse_to_array, ArrayType(FloatType()))

#df = df.withColumn('features_array', sparse_to_array_udf('features'))

#sparce_to_dense = udf(lambda x: x.toArray(), ArrayType(IntegerType()))
df2 = df.withColumn('list_current_filters', sparse_to_array_udf('list_current_filters')).cache()

In [42]:
df2.select('list_current_filters').show(1)

+--------------------+
|list_current_filters|
+--------------------+
|[0.0, 0.0, 0.0, 0...|
+--------------------+
only showing top 1 row



In [43]:
df2.select('list_current_filters').limit(100).toPandas()

Unnamed: 0,list_current_filters
0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,"[1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
3,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
...,...
95,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
96,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
97,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
98,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [17]:
_df = df.limit(100).toPandas()

In [23]:
_df[['list_current_filters']]

Unnamed: 0,list_current_filters
0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,"(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
3,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
...,...
95,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
96,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
97,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
98,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [19]:
list(_df.iloc[2].list_current_filters)

[1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]

In [20]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- step: integer (nullable = true)
 |-- action_type: string (nullable = true)
 |-- reference: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- city: string (nullable = true)
 |-- device: string (nullable = true)
 |-- current_filters: string (nullable = false)
 |-- impressions: string (nullable = true)
 |-- prices: string (nullable = true)
 |-- impressions_array: string (nullable = true)
 |-- idx: long (nullable = false)
 |-- current_filters_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- list_current_filters: vector (nullable = true)
 |-- idx: long (nullable = true)
 |-- hotel: double (nullable = true)
 |-- 5 star: double (nullable = true)
 |-- resort: double (nullable = true)
 |-- sort by price: double (nullable = true)
 |-- 4 star: double (nullable = true)
 |-- motel: double (nullable = true)
 |-- h