In [8]:
import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Spark session & context
spark = SparkSession.builder.master("spark://spark:7077") \
        .appName("jupyter-notebook-analytics") \
        .config("spark.driver.memory", "512m") \
        .config("spark.mongodb.input.uri", "mongodb://mongodb:27017/test.myCollection") \
        .config("spark.mongodb.output.uri", "mongodb://mongodb:27017/test.myCollection") \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.2') \
        .getOrCreate()
spark

In [3]:
sc = spark.sparkContext
sc

In [4]:
# Sum of the first 100 whole numbers
from pyspark.rdd import RDD

rdd = sc.parallelize(range(1000+1))
rdd.sum()

500500

# Mongo Spark Connector
Example


In [5]:
people = spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
   ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", 50)], schema='name string, age int')

people.write.format("mongo").mode("append").save()
# people.write.format("mongodb").mode("append").save()

+-------------+----+
|         name| age|
+-------------+----+
|Bilbo Baggins|  50|
|      Gandalf|1000|
|       Thorin| 195|
|        Balin| 178|
|         Kili|  77|
|       Dwalin| 169|
|          Oin| 167|
|        Gloin| 158|
|         Fili|  82|
|       Bombur|  50|
+-------------+----+



In [None]:
# If you need to write to a different MongoDB collection, use the .option() 
# method with .write().
# To write to a collection called contacts in a database called people, 
# specify the collection and database with .option():
# OLD: people.write.format("mongodb").mode("append").option("database","people").option("collection", "contacts").save()
# people.write.format("mongo").mode("append").option("database", "people").option("collection", "contacts").save()
people.show()

In [7]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [8]:
# df = spark.read.format("mongodb").load()
df = spark.read.format("mongo").load()
df.printSchema()
# df = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1/people.contacts").load()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [9]:
pipeline = "{'$match': {'name': 'Gloin'}}"
df = spark.read.format("mongo").option("pipeline", pipeline).load()
df.show()



+--------------------+---+-----+
|                 _id|age| name|
+--------------------+---+-----+
|{63c6cc2814c2b47f...|158|Gloin|
+--------------------+---+-----+



In [10]:
spark.stop()

# Analytics

In [5]:
#df = spark.read.format("mongo").option("pipeline", pipeline).load()
df = spark.read.format("mongo").load()
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- country: string (nullable = true)
 |-- crawlID: string (nullable = true)
 |-- currentURL: string (nullable = true)
 |-- enseigneID: string (nullable = true)
 |-- enseigneName: string (nullable = true)
 |-- job_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- other: string (nullable = true)
 |-- productBrand: string (nullable = true)
 |-- productCategory: string (nullable = true)
 |-- productDate: string (nullable = true)
 |-- productEAN: string (nullable = true)
 |-- productID: string (nullable = true)
 |-- productImage: string (nullable = true)
 |-- productIsAvailable: long (nullable = true)
 |-- productIsBio: long (nullable = true)
 |-- productIsFreezer: long (nullable = true)
 |-- productIsFresh: long (nullable = true)
 |-- productIsNew: long (nullable = true)
 |-- productLinkDetail: string (nullable = true)
 |-- productName: string (nullable = true)
 |-- productPackaging: s

In [17]:
# get all columnnames in order
df.columns

['_id',
 'country',
 'crawlID',
 'currentURL',
 'enseigneID',
 'enseigneName',
 'job_id',
 'language',
 'other',
 'productBrand',
 'productCategory',
 'productDate',
 'productEAN',
 'productID',
 'productImage',
 'productIsAvailable',
 'productIsBio',
 'productIsFreezer',
 'productIsFresh',
 'productIsNew',
 'productLinkDetail',
 'productName',
 'productPackaging',
 'productPosition',
 'productPrice',
 'productPriceBase',
 'productPricePrevious',
 'productPromotionText',
 'productQuantityRating',
 'productRating',
 'productUnit',
 'productValueUnit',
 'promotion',
 'shopID',
 'typeCrawler',
 'zip_code']

In [13]:
rdd = df.rdd

pyspark.rdd.RDD

In [24]:
rdd2 = rdd.sample(False, 0.1, 81)
type(rdd2)

pyspark.rdd.PipelinedRDD

In [28]:
df2 = rdd2.toDF(df.columns)

In [None]:
x = df2.select("productCategory").distinct().show()
print(x)

In [None]:
# maybe faster as rdd

## Analysis of out-of-stock products by category

In [None]:
#unavailableProductsDf = df.filter(df[productIsAvailable].isNull()).count()
categories = df.select("productCategory").distinct().show() # convert to list
dates = df.select("productDate").distinct().show()

# count unavailable products by category and date
for category in categories:
    for date in dates:
        filtered_df = df.filter((df.productCategory == category) & (df.productDate == date))
        #available_count = filtered_df.filter(filtered_df.productIsAvailable == "yes").count()
        unavailable_count = filtered_df.filter(filtered_df.productIsAvailable == "no").count()
        # see if total is always the same
        # include location?
    # each category one color line, x-axis date, y-axis count of unavailable products
    plt.plot(dates, unavailable_count, label = category)

plt.legend()
plt.show()

## Analysis of out-of-stock products by departments

In [None]:
#departments = df.select("zip_code").map

rdd = spark.sparkContext.parallelize(df)
rdd2 = rdd.map(lambda x: "".join(list(x["zip_code"])[:1]))
#df2 = rdd2.toDF(["name","gender","new_salary"]   )
departments = df.select("zip_code").distinct().show()

for element in rdd2.collect():
    print(element)
for department in departments:
    for date in dates:
        # count avaiable and unavailable products
        filtered_df = df.filter((df.zip_code == department) & (df.productDate == date))
        #available_count = filtered_df.filter(filtered_df.productIsAvailable == "yes").count()
        unavailable_count = filtered_df.filter(filtered_df.productIsAvailable == "no").count()
        # see if total is always the same
    # each category one color line, x-axis date, y-axis count of unavailable products
    plt.plot(dates, unavailable_count, label = category)

plt.legend()
plt.show()

## Evolution of prices over time and by department

In [None]:
for department in departments:
    for date in dates:
        filtered_df = df.filter((df.zip_code == department) & (df.productDate == date))
        # average prices over department
        prices = filtered_df.filter
        df.groupBy("zip_code").agg(F.mean('productPrice'), F.count('productPrice')).show()


## Market share of customers (based on the brands)

## Market shares by group