In [32]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('amit').getOrCreate()

In [33]:
from functools import reduce

u_item = spark.read.csv('u.item',sep='|')

u_item_old_columns = u_item.columns
u_item_new_columns =  ['item-id','movie-title','release-date','video-release-data','IMDB-url','unknown','action','adventure','animation',"children's",'comedy','crime','documentary','drama','fantasy','film-noir','horror','musical','mystery','romance','sci-fi','thriller','war','western']

u_item = reduce(lambda df,ids: df.withColumnRenamed(u_item_old_columns[ids],u_item_new_columns[ids]),range(len(u_item_old_columns)),u_item)

u_item = u_item.drop(*['release-date','video-release-data','IMDB-url'])

u_item.count()

1682

In [34]:
u_data = spark.read.csv('u.data',sep='\t')

u_data_old_columns = u_data.columns
u_data_new_columns = ['user-id','item-id','rating','timestamp']

u_data = reduce(lambda df,ids: df.withColumnRenamed(u_data_old_columns[ids],u_data_new_columns[ids]),range(len(u_data_new_columns)),u_data)

u_data = u_data.drop('timestamp')

u_data.count()

100000

In [35]:
u_user = spark.read.csv('u.user',sep='|')

u_user_old_columns = u_user.columns
u_user_new_columns = ['user-id','age','gender','occupation','zip code']

u_user = reduce(lambda df,ids: df.withColumnRenamed(u_user_old_columns[ids],u_user_new_columns[ids]),range(len(u_user_old_columns)),u_user)

u_user = u_user.drop(*['gender','zip code'])

u_user.show()

+-------+---+-------------+
|user-id|age|   occupation|
+-------+---+-------------+
|      1| 24|   technician|
|      2| 53|        other|
|      3| 23|       writer|
|      4| 24|   technician|
|      5| 33|        other|
|      6| 42|    executive|
|      7| 57|administrator|
|      8| 36|administrator|
|      9| 29|      student|
|     10| 53|       lawyer|
|     11| 39|        other|
|     12| 28|        other|
|     13| 47|     educator|
|     14| 45|    scientist|
|     15| 49|     educator|
|     16| 21|entertainment|
|     17| 30|   programmer|
|     18| 35|        other|
|     19| 40|    librarian|
|     20| 42|    homemaker|
+-------+---+-------------+
only showing top 20 rows



In [36]:
item_data_join = u_item.join(u_data, on='item-id')
item_data_join.show()

+-------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-------+------+
|item-id|         movie-title|unknown|action|adventure|animation|children's|comedy|crime|documentary|drama|fantasy|film-noir|horror|musical|mystery|romance|sci-fi|thriller|war|western|user-id|rating|
+-------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-------+------+
|    242|        Kolya (1996)|      0|     0|        0|        0|         0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|    196|     3|
|    302|L.A. Confidential...|      0|     0|        0|        0|         0|     0|    1|          0|    0|      0|        1|     0|      0|      1|      0|     0|       1|  0|      0|    186|     3|


In [37]:
final_join = item_data_join.join(u_user, on='user-id')
final_join.show()
final_join.count()

+-------+-------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+---+-------------+
|user-id|item-id|         movie-title|unknown|action|adventure|animation|children's|comedy|crime|documentary|drama|fantasy|film-noir|horror|musical|mystery|romance|sci-fi|thriller|war|western|rating|age|   occupation|
+-------+-------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+---+-------------+
|    196|    242|        Kolya (1996)|      0|     0|        0|        0|         0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     3| 49|       writer|
|    186|    302|L.A. Confidential...|      0|     0|        0|        0|         0|     0|    1|          0|    0|      0|     

100000

In [38]:
from pyspark.sql.functions import when

result_df = final_join.select("occupation",
                                when((u_user.age >= 20) & (u_user.age <= 25), "20-25")
                                .when((u_user.age >= 26) & (u_user.age <= 35), "26-35")
                                .when((u_user.age >= 36) & (u_user.age <= 45), "36-45")
                                .otherwise("45+") 
                                .alias("age_group"),'rating','unknown','action','adventure','animation',"children's",'comedy','crime','documentary','drama','fantasy','film-noir','horror','musical','mystery','romance','sci-fi','thriller','war','western')


In [39]:
result_df.show()

+-------------+---------+------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|   occupation|age_group|rating|unknown|action|adventure|animation|children's|comedy|crime|documentary|drama|fantasy|film-noir|horror|musical|mystery|romance|sci-fi|thriller|war|western|
+-------------+---------+------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|       writer|      45+|     3|      0|     0|        0|        0|         0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|    executive|    36-45|     3|      0|     0|        0|        0|         0|     0|    1|          0|    0|      0|        1|     0|      0|      1|      0|     0|       1|  0|      0|
|       writer|    20-25|     1|      0|     0|        0|        

In [40]:
from pyspark.sql.functions import array, when, col, array_remove, udf, filter
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, ArrayType

genres_columns = result_df.columns[3:22]

genre_array = array([when(col(col_name) == 1, col_name) for col_name in genres_columns])


new_df = result_df.withColumn("genres", genre_array).withColumn('genres', func.filter(func.col('genres'), lambda x: x != 'null'))

new_df = new_df.drop(*['unknown','action','adventure','animation',"children's",'comedy','crime','documentary','drama','fantasy','film-noir','horror','musical','mystery','romance','sci-fi','thriller','war','western','genres_with_ones'])


new_df.show()


+-------------+---------+------+--------------------+
|   occupation|age_group|rating|              genres|
+-------------+---------+------+--------------------+
|       writer|      45+|     3|            [comedy]|
|    executive|    36-45|     3|[crime, film-noir...|
|       writer|    20-25|     1|[children's, comedy]|
|   technician|    26-35|     2|[drama, romance, ...|
|     educator|      45+|     1|      [crime, drama]|
|    executive|    36-45|     4|       [sci-fi, war]|
|     engineer|    26-35|     2|  [action, thriller]|
|    librarian|    26-35|     5|[adventure, child...|
|   programmer|    20-25|     3|[comedy, musical,...|
|    executive|    36-45|     3|             [drama]|
|administrator|    26-35|     2|[action, adventur...|
|      student|    26-35|     5|            [comedy]|
|   programmer|    36-45|     5|[action, adventur...|
|     engineer|    36-45|     3|            [comedy]|
|     educator|    26-35|     3|[action, adventur...|
|      student|      45+|   

In [41]:
from pyspark.sql.functions import explode

new_df_exploded = new_df.select('*', explode(new_df.genres))

new_df_exploded = new_df_exploded.withColumnRenamed('col','genre')

new_df_exploded = new_df_exploded.drop('genres')
new_df_exploded.show()

+----------+---------+------+----------+
|occupation|age_group|rating|     genre|
+----------+---------+------+----------+
|    writer|      45+|     3|    comedy|
| executive|    36-45|     3|     crime|
| executive|    36-45|     3| film-noir|
| executive|    36-45|     3|   mystery|
| executive|    36-45|     3|  thriller|
|    writer|    20-25|     1|children's|
|    writer|    20-25|     1|    comedy|
|technician|    26-35|     2|     drama|
|technician|    26-35|     2|   romance|
|technician|    26-35|     2|       war|
|technician|    26-35|     2|   western|
|  educator|      45+|     1|     crime|
|  educator|      45+|     1|     drama|
| executive|    36-45|     4|    sci-fi|
| executive|    36-45|     4|       war|
|  engineer|    26-35|     2|    action|
|  engineer|    26-35|     2|  thriller|
| librarian|    26-35|     5| adventure|
| librarian|    26-35|     5|children's|
| librarian|    26-35|     5|   romance|
+----------+---------+------+----------+
only showing top

In [42]:
df = new_df_exploded.groupBy('occupation','age_group','genre').agg({'rating':'max'})

df = df.drop('max(rating)')

In [43]:
df.show()

+-------------+---------+---------+
|   occupation|age_group|    genre|
+-------------+---------+---------+
|   programmer|    26-35|  romance|
|    marketing|      45+|   comedy|
|       artist|    36-45|   horror|
|     educator|    20-25|    drama|
|         none|    20-25|  unknown|
|    marketing|    26-35|   action|
|       artist|      45+| thriller|
|    marketing|    36-45|  fantasy|
|    homemaker|    26-35|    crime|
|        other|    36-45|film-noir|
|     salesman|    36-45| thriller|
|       writer|    26-35|  unknown|
|     educator|    26-35|    crime|
|       doctor|    26-35| thriller|
|   technician|    26-35|  mystery|
|    librarian|    20-25|   horror|
|     salesman|    36-45|    drama|
|administrator|      45+|    drama|
|   programmer|    26-35|  fantasy|
|   programmer|      45+|  mystery|
+-------------+---------+---------+
only showing top 20 rows

