In [18]:
#!pip install pyspark
#!pip install -q kaggle

from pyspark.sql import SparkSession
import zipfile
import os

# create spark session
spark = SparkSession.builder\
          .master("local[10]")\
          .appName("app-apache-spark")\
          .config('spark.ui.port','4050')\
          .getOrCreate()

In [None]:
# descargar dataset
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download mkechinov/ecommerce-events-history-in-cosmetics-shop

In [None]:
#descomprimir el.zip del dataset
for file in os.listdir():
  print(file)
  if file.endswith('.zip'):
    zip_ref = zipfile.ZipFile(file,'r')
    zip_ref.extractall()
    zip_ref.close()

In [19]:
# cargar el dataset y crear dataframe
df = spark.read.options(header='True',inferScheme='True').csv('*.csv')

In [20]:
print(df.count())
df.printSchema()



20692840
root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



                                                                                

In [21]:
df.select("event_type").distinct().show()



+----------------+
|      event_type|
+----------------+
|        purchase|
|            view|
|            cart|
|remove_from_cart|
+----------------+



                                                                                

In [None]:
sessions = df.select(['user_session']).filter("event_type='cart' AND product_id=4958").distinct()

# Ejercicio
Obtener que otros productos tambien fueron agregados al carrito junto al producto con id 4958 

In [23]:
sessions = df.select(['user_session']).filter("event_type='cart' AND product_id=4958").distinct()

In [24]:
products = df.select(['product_id']).filter("event_type='cart' AND product_id<>4958").filter( df["user_session"].isin(sessions["user_session"]) )
print(products.count())
products = products.distinct()
print(products.count())

                                                                                

5759760




46156


                                                                                

In [25]:
products.write.mode("overwrite").csv('resultado')

                                                                                

# RDD

In [27]:
def myFunc(s):
  if s["brand"]=="riche" and s["event_type"]=="cart":
    return [ ( s["product_id"],1 ) ]
  return []

In [28]:
lines = df.rdd.flatMap(myFunc).reduceByKey(lambda a, b: a + b)
for element in lines.collect():
    print(element)

Exception in thread "serve RDD 276" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:571)
	at java.net.ServerSocket.accept(ServerSocket.java:534)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)

('5842214', 13)
('5842215', 10)
('5842242', 7)
('5842212', 3)
('5842205', 12)
('5842220', 6)
('5922103', 1)
('5842213', 12)
('5842241', 6)
('5842219', 9)
('5842231', 16)
('5922106', 3)
('5842202', 17)
('5842203', 13)
('5922109', 1)
('5842233', 4)
('5922115', 1)
('5922123', 1)
('5922121', 1)
('5842232', 20)
('5842268', 131)
('5922107', 5)
('5922119', 2)
('5922110', 3)
('5922122', 1)
('5842235', 12)
('5842258', 20)
('5844573', 3)
('5922097', 1)
('5842223', 12)
('5846097', 5)
('5846098', 21)
('5922118', 9)
('5842224', 9)
('5922124', 1)
('5922112', 2)
('5922104', 2)
('5842221', 9)
('5842204', 17)
('5842217', 11)
('5842234', 5)
('5842206', 2)
('5844571', 20)
('5842240', 12)
('5922117', 6)
('5842222', 25)
('5842225', 6)
('5842226', 6)
('5842227', 11)
('5922114', 1)
('5922108', 1)


                                                                                

In [None]:
lines.saveAsTextFile("resultado2")

## Variables Broadcast

In [None]:
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

# SPARK SQL

In [29]:
df.createOrReplaceTempView("data")
spark.sql("SELECT DISTINCT(product_id), brand, price FROM data WHERE price=(SELECT MAX(price) FROM data)").show()



+----------+--------+-----+
|product_id|   brand|price|
+----------+--------+-----+
|   5767003|kosmekka|99.84|
+----------+--------+-----+



                                                                                

# Streaming

sbin ./start-master.sh
sbin ./start-worker.sh spark://127.0.0.1:7077
bin ./spark-submit --master spark://MacBook-Pro-de-Jhon.local:7077 /Users/sebastian/ApacheSpark/ipynb/streaming.py

# ML MACHINE LEARNING

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# la regresión logística es un tipo de análisis de regresión utilizado 
# para predecir el resultado de una variable categórica 
# (una variable que puede adoptar un número limitado de categorías) 
# en función de las variables independientes o predictoras.


# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

# GRAPHX - GRAPHFRAMES
