In [12]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *

execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

<h3>Ingest parquet files</h3>

In [3]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

<h4>look into "data" file</h4>
 - trackId - id of the track
 - userId - id of the user
 - artistId - id of the artist
 - timestamp - timestamp of the moment the user starts listening to a track

In [4]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- trackId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [6]:
data.show(3)

+------+-------+--------+----------+
|userId|trackId|artistId| timestamp|
+------+-------+--------+----------+
| 13065| 944906|  978428|1501588527|
|101897| 799685|  989262|1501555608|
|215049| 871513|  988199|1501604269|
+------+-------+--------+----------+
only showing top 3 rows



<h4>look into "meta" file</h4>
 - Type could be “track” or “artist”
 - Name is the title of the track if the type == “track” and the name of the musician or group if the type == “artist”.
 - Artist states for the creator of the track in case the type == “track” and for the name of the musician or group in case the type == “artist”.
 - Id - id of the item

In [5]:
meta.printSchema()

root
 |-- type: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Id: integer (nullable = true)



In [10]:
meta.show(3)

+-----+--------------------+--------------------+-------+
| type|                Name|              Artist|     Id|
+-----+--------------------+--------------------+-------+
|track|               Smile| Artist: Josh Groban|1223851|
|track|Chuni Ashkharhe Q...|Artist: Razmik Amyan|1215486|
|track|           Dark City|Artist: Machinae ...|1296462|
+-----+--------------------+--------------------+-------+
only showing top 3 rows



<h3>Create empty dataframe for edges and weight</h3>

In [14]:
schema_edges = StructType([
    StructField("from_track", IntegerType(), True), 
    StructField("to_track", IntegerType(), True),
    StructField("weight", IntegerType(), True)
])

In [15]:
edge_list = spark.createDataFrame([], schema_edges)

In [16]:
edge_list.printSchema()

root
 |-- from_track: integer (nullable = true)
 |-- to_track: integer (nullable = true)
 |-- weight: integer (nullable = true)



In [17]:
edge_list.show(3)

+----------+--------+------+
|from_track|to_track|weight|
+----------+--------+------+
+----------+--------+------+

