In [18]:
#!pip install pyspark

In [19]:
import pyspark

In [20]:
sc_conf = pyspark.SparkConf()
sc_conf.set('spark.executor.memory', '16g')
sc_conf.set('spark.driver.memory', '16g')

<pyspark.conf.SparkConf at 0x7fd2504107f0>

In [21]:
sc = pyspark.SparkContext(conf=sc_conf)

In [22]:
sc.getConf().getAll()

[('spark.executor.memory', '16g'),
 ('spark.driver.host', '192.168.150.209'),
 ('spark.driver.port', '37799'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1575990677583'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.memory', '16g'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [23]:
sc

In [24]:
spark = pyspark.sql.SparkSession(sc)

In [25]:
artists_ddf = spark.read.csv('./SparkR/artist_data.txt',sep='\t',header=None,inferSchema=True).selectExpr('_c0 as artist_id','_c1 as artist_name')

In [26]:
artists_ddf.show(5)

+---------+--------------------+
|artist_id|         artist_name|
+---------+--------------------+
|  1134999|        06Crazy Life|
|  6821360|        Pang Nakarin|
| 10113088|Terfel, Bartoli- ...|
| 10151459| The Flaming Sidebur|
|  6826647|   Bodenstandig 3000|
+---------+--------------------+
only showing top 5 rows



In [27]:
counts_ddf = (spark
              .read.csv('./SparkR/user_artist_data.txt',sep=' ',header=None,inferSchema=True)
              .selectExpr('_c0 as user_id',
                         '_c1 as artist_id',
                         '_c2 as playcount')
            ).cache()

In [28]:
counts_ddf.show(5)

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1000002|        1|       55|
|1000002|  1000006|       33|
|1000002|  1000007|        8|
|1000002|  1000009|      144|
|1000002|  1000010|      314|
+-------+---------+---------+
only showing top 5 rows



In [29]:
good_ddf = (spark
              .read.csv('./SparkR/artist_alias.txt',sep='\t',header=None,inferSchema=True)
              .selectExpr('_c0 as bad_id',
                         '_c1 as good_id')
            ).cache()

In [30]:
good_ddf.show(5)

+--------+-------+
|  bad_id|good_id|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [31]:
import pyspark.sql.functions as F

In [32]:
valid_artists_ddf = (counts_ddf
                    .join(good_ddf.withColumnRenamed('bad_id','artist_id'),how='left',on='artist_id')
                    .withColumn('valid_artist_id',
                                F.when(F.col('good_id').isNull(),F.col('artist_id')).otherwise(F.col('good_id')))
                     .groupby('user_id',"valid_artist_id")
                     .agg(F.sum('playcount').alias('total_count'))
                    )

In [33]:
valid_artists_ddf.show(5)

+-------+---------------+-----------+
|user_id|valid_artist_id|total_count|
+-------+---------------+-----------+
|1000002|        1004315|          1|
|1000002|        1004395|          1|
|1000002|        1035248|         17|
|1000002|           1198|         51|
|1000002|            344|         18|
+-------+---------------+-----------+
only showing top 5 rows



In [34]:
#spark.stop()
#sc.stop()