## **Instalación:**

Para ejecutar Spark en Colab, primero debemos instalar todas las dependencias en el entorno de Colab, es decir, Apache Spark 3.0.0 con Hadoop 3.2, Java 11 y Findspark para ubicar Spark en el sistema. La instalación de las herramientas se puede realizar dentro del Jupyter Notebook del Colab. Siga los pasos para instalar las dependencias:

In [29]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz 
!tar xf spark-3.2.1-bin-hadoop3.2.tgz 
!pip install -q findspark

Ahora que instaló Spark y Java en Colab, es hora de establecer la ruta del entorno que le permite ejecutar Pyspark en su entorno de Colab. Establezca la ubicación de Java y Spark ejecutando el siguiente código

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

Corra la sesión y testee la instalación

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

Ejemplo de una primera transformación

In [None]:
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
  print (num)

1
4
9
16


Importar Csv desde **máquina**

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving movietweets.csv to movietweets.csv
User uploaded file "movietweets.csv" with length 5528 bytes


Primer ejercicio con datos csv

In [None]:
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

In [None]:
lines = sc.textFile("amigos.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
    print(result)

(26, 242.05882352941177)
(40, 250.8235294117647)
(68, 269.6)
(54, 278.0769230769231)
(38, 193.53333333333333)
(56, 306.6666666666667)
(36, 246.6)
(22, 206.42857142857142)
(60, 202.71428571428572)
(30, 235.8181818181818)
(42, 303.5)
(48, 281.4)
(50, 254.6)
(32, 207.9090909090909)
(58, 116.54545454545455)
(64, 281.3333333333333)
(52, 340.6363636363636)
(24, 233.8)
(20, 165.0)
(62, 220.76923076923077)
(44, 282.1666666666667)
(28, 209.1)
(66, 276.44444444444446)
(46, 223.69230769230768)
(18, 343.375)
(34, 245.5)
(33, 325.3333333333333)
(55, 295.53846153846155)
(59, 220.0)
(37, 249.33333333333334)
(27, 228.125)
(53, 222.85714285714286)
(57, 258.8333333333333)
(43, 230.57142857142858)
(35, 211.625)
(45, 309.53846153846155)
(67, 214.625)
(19, 213.27272727272728)
(51, 302.14285714285717)
(25, 197.45454545454547)
(21, 350.875)
(49, 184.66666666666666)
(39, 169.28571428571428)
(31, 267.25)
(41, 268.55555555555554)
(69, 235.2)
(65, 298.2)
(61, 256.22222222222223)
(29, 215.91666666666666)
(47, 233

Nuevo RDD

In [None]:
tweetsRDD = sc.textFile("movietweets.csv")

In [None]:
tweetsRDD.take(5)

['positive,The Da Vinci Code book is just awesome.',
 'positive,i liked the Da Vinci Code a lot.',
 'positive,i liked the Da Vinci Code a lot.',
 "positive,I liked the Da Vinci Code but it ultimatly didn't seem to hold it's own.",
 "positive,that's not even an exaggeration ) and at midnight we went to Wal-Mart to buy the Da Vinci Code"]

In [None]:
ucRDD = tweetsRDD.map( lambda x : x.upper() )
ucRDD.take(5)

['POSITIVE,THE DA VINCI CODE BOOK IS JUST AWESOME.',
 'POSITIVE,I LIKED THE DA VINCI CODE A LOT.',
 'POSITIVE,I LIKED THE DA VINCI CODE A LOT.',
 "POSITIVE,I LIKED THE DA VINCI CODE BUT IT ULTIMATLY DIDN'T SEEM TO HOLD IT'S OWN.",
 "POSITIVE,THAT'S NOT EVEN AN EXAGGERATION ) AND AT MIDNIGHT WE WENT TO WAL-MART TO BUY THE DA VINCI CODE"]

In [None]:
tweetsRDD.count()

100

Otras operaciones "auto-data"

In [None]:
autoData = sc.textFile("auto-data.csv")
autoData.cache()
#Loads only now.
print(autoData.count())
print(autoData.first())
print(autoData.take(5))
new_rdd = sc.parallelize(autoData.take(5))

198
MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE', 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118', 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151', 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195', 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']


In [None]:
for line in new_rdd.collect():
    print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348


Guardar en un archivo local. Primero recopile el RDD al maestro y luego guárdelo como local

In [None]:
autoDataFile = open("auto-data-saved.csv","w")
autoDataFile.write("\n".join(new_rdd.collect()))
autoDataFile.close()

In [None]:
!cat auto-data-saved.csv

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348

# **Transformaciones**

Map y cree un nuevo RDD

In [None]:
tsvData=autoData.map(lambda x : x.replace(",","\t"))
tsvData.take(5)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348']

Filter para crear un nuevo RDD

In [None]:
toyotaData=autoData.filter(lambda x: "toyota" in x)
toyotaData.count()

32

FlatMap: https://sparkbyexamples.com/spark/spark-flatmap-usage-with-example/

In [None]:
words=toyotaData.flatMap(lambda line: line.split(","))
words.count()
words.take(20)

['toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62',
 '4800',
 '35',
 '39',
 '5348',
 'toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62']

SetOperations

In [None]:
words1 = sc.parallelize(["hello","war","peace","world"])
words2 = sc.parallelize(["war","peace","universe"])

for unions in words1.union(words2).distinct().collect():
    print(unions)
print("------------------")
for intersects in words1.intersection(words2).collect():
    print(intersects)

hello
peace
world
universe
war
------------------
peace
war


Uso de funciones para la transformación, limpieza y transformación de un RDD

In [None]:
def cleanseRDD(autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList=autoStr.split(",")
    #convert doors to a number str
    if attList[3] == "two" :
         attList[3]="2"
    elif attList[3] == "four" :
         attList[3]="4"
    #Convert Drive to uppercase
    attList[5] = attList[5].upper()
    return ",".join(attList)
    
cleanedData=autoData.map(cleanseRDD)

for line in autoData.take(5):
  print(line)
print("------------------")
for line in cleanedData.take(5):
  print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
------------------
MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118
chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151
mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195
toyota,gas,std,2,hatchback,FWD,four,62,4800,35,39,5348


# Nuevo

In [30]:
archivo = './sample_data/california_housing_train.csv'
df_spark = spark.read.csv(archivo, inferSchema=True, header=True)

# imprimir tipo de archivo
print(type(df_spark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [31]:
df_spark.count()

17000

In [32]:
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [33]:
df_spark.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [34]:
df_spark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [35]:
df_spark.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
longitude,17000,-119.56210823529375,2.0051664084260357,-124.35,-114.31
latitude,17000,35.6252247058827,2.1373397946570867,32.54,41.95
housing_median_age,17000,28.58935294117647,12.586936981660406,1.0,52.0
total_rooms,17000,2643.664411764706,2179.947071452777,2.0,37937.0
total_bedrooms,17000,539.4108235294118,421.4994515798648,1.0,6445.0
population,17000,1429.5739411764705,1147.852959159527,3.0,35682.0
households,17000,501.2219411764706,384.5208408559016,1.0,6082.0
median_income,17000,3.883578100000021,1.9081565183791036,0.4999,15.0001
median_house_value,17000,207300.91235294117,115983.76438720895,14999.0,500001.0


In [36]:
df_spark.describe(['median_house_value']).show()

+-------+------------------+
|summary|median_house_value|
+-------+------------------+
|  count|             17000|
|   mean|207300.91235294117|
| stddev|115983.76438720895|
|    min|           14999.0|
|    max|          500001.0|
+-------+------------------+



Otro

In [46]:
url = "https://raw.githubusercontent.com/MateoBaron2/Spark/main/movie.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
df_movie = spark.read.csv("file://"+SparkFiles.get("movie.csv"), header=True, inferSchema= True)

In [47]:
df_movie.count()

5043

In [48]:
df_movie.printSchema()

root
 |-- color: string (nullable = true)
 |-- director_name: string (nullable = true)
 |-- num_critic_for_reviews: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- director_facebook_likes: integer (nullable = true)
 |-- actor_3_facebook_likes: integer (nullable = true)
 |-- actor_2_name: string (nullable = true)
 |-- actor_1_facebook_likes: integer (nullable = true)
 |-- gross: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- actor_1_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- num_voted_users: integer (nullable = true)
 |-- cast_total_facebook_likes: integer (nullable = true)
 |-- actor_3_name: string (nullable = true)
 |-- facenumber_in_poster: integer (nullable = true)
 |-- plot_keywords: string (nullable = true)
 |-- movie_imdb_link: string (nullable = true)
 |-- num_user_for_reviews: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- country: string (nullable = true)
 |-- content_rati

In [49]:
df_movie.columns

['color',
 'director_name',
 'num_critic_for_reviews',
 'duration',
 'director_facebook_likes',
 'actor_3_facebook_likes',
 'actor_2_name',
 'actor_1_facebook_likes',
 'gross',
 'genres',
 'actor_1_name',
 'movie_title',
 'num_voted_users',
 'cast_total_facebook_likes',
 'actor_3_name',
 'facenumber_in_poster',
 'plot_keywords',
 'movie_imdb_link',
 'num_user_for_reviews',
 'language',
 'country',
 'content_rating',
 'budget',
 'title_year',
 'actor_2_facebook_likes',
 'imdb_score',
 'aspect_ratio',
 'movie_facebook_likes']

In [51]:
df_movie.show()

+-----+-----------------+----------------------+--------+-----------------------+----------------------+--------------------+----------------------+---------+--------------------+------------------+--------------------+---------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+-------+--------------+---------+----------+----------------------+----------+------------+--------------------+
|color|    director_name|num_critic_for_reviews|duration|director_facebook_likes|actor_3_facebook_likes|        actor_2_name|actor_1_facebook_likes|    gross|              genres|      actor_1_name|         movie_title|num_voted_users|cast_total_facebook_likes|        actor_3_name|facenumber_in_poster|       plot_keywords|     movie_imdb_link|num_user_for_reviews|language|country|content_rating|   budget|title_year|actor_2_facebook_likes|imdb_score|aspect_ratio|movie_facebook_likes|
+-----+-----------------

In [52]:
df_movie.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
color,5024,,,Black and White,Color
director_name,4939,,,A. Raven Cruz,Étienne Faure
num_critic_for_reviews,4993,140.1942719807731,121.60167539623127,1,813
duration,5028,107.2010739856802,25.19744080882414,7,511
director_facebook_likes,4939,686.5092123911724,2813.328606865674,0,23000
actor_3_facebook_likes,5020,645.0097609561753,1665.0417284458563,0,23000
actor_2_name,5030,,,50 Cent,Zubaida Sahar
actor_1_facebook_likes,5036,6560.04706115965,15020.759119984074,0,640000
gross,4159,4.846840752680933E7,6.84529904387528E7,162,760505847


In [53]:
df_movie.describe(['duration']).show()

+-------+-----------------+
|summary|         duration|
+-------+-----------------+
|  count|             5028|
|   mean|107.2010739856802|
| stddev|25.19744080882414|
|    min|                7|
|    max|              511|
+-------+-----------------+



Otra forma de leer desde Csv

In [54]:
import pandas as pd

url_github = 'https://raw.githubusercontent.com/AISCIENCES/course-master-big-data-with-pyspark-and-aws/main/Code/03-Spark%20DFs/StudentData.csv'


pd_df = pd.read_csv(url_github)
spark_df = spark.createDataFrame(pd_df)

spark_df.limit(5).show()

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
+---+------+----------------+------+-----+-----+--------------------+



In [55]:
url_github_amigos = 'https://raw.githubusercontent.com/MateoBaron2/Spark/main/amigos.csv'


pd_df = pd.read_csv(url_github_amigos)
spark_df_amigos = spark.createDataFrame(pd_df)

spark_df_amigos.limit(5).show()

+---+--------+---+---+
|  0|    Will| 33|385|
+---+--------+---+---+
|  1|Jean-Luc| 26|  2|
|  2|    Hugh| 55|221|
|  3|  Deanna| 40|465|
|  4|   Quark| 68| 21|
|  5|  Weyoun| 59|318|
+---+--------+---+---+



In [57]:
#converts DataFrame to rdd
rdd_amigos=spark_df_amigos.rdd

print(rdd_amigos.collect())

[Row(0=1, Will='Jean-Luc', 33=26, 385=2), Row(0=2, Will='Hugh', 33=55, 385=221), Row(0=3, Will='Deanna', 33=40, 385=465), Row(0=4, Will='Quark', 33=68, 385=21), Row(0=5, Will='Weyoun', 33=59, 385=318), Row(0=6, Will='Gowron', 33=37, 385=220), Row(0=7, Will='Will', 33=54, 385=307), Row(0=8, Will='Jadzia', 33=38, 385=380), Row(0=9, Will='Hugh', 33=27, 385=181), Row(0=10, Will='Odo', 33=53, 385=191), Row(0=11, Will='Ben', 33=57, 385=372), Row(0=12, Will='Keiko', 33=54, 385=253), Row(0=13, Will='Jean-Luc', 33=56, 385=444), Row(0=14, Will='Hugh', 33=43, 385=49), Row(0=15, Will='Rom', 33=36, 385=49), Row(0=16, Will='Weyoun', 33=22, 385=323), Row(0=17, Will='Odo', 33=35, 385=13), Row(0=18, Will='Jean-Luc', 33=45, 385=455), Row(0=19, Will='Geordi', 33=60, 385=246), Row(0=20, Will='Odo', 33=67, 385=220), Row(0=21, Will='Miles', 33=19, 385=268), Row(0=22, Will='Quark', 33=30, 385=72), Row(0=23, Will='Keiko', 33=51, 385=271), Row(0=24, Will='Julian', 33=25, 385=1), Row(0=25, Will='Ben', 33=21, 38

In [59]:
#lines = sc.textFile(rdd_amigos)
rdd_amigos = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
    print(result)

(26, 242.05882352941177)
(40, 250.8235294117647)
(68, 269.6)
(54, 278.0769230769231)
(38, 193.53333333333333)
(56, 306.6666666666667)
(36, 246.6)
(22, 206.42857142857142)
(60, 202.71428571428572)
(30, 235.8181818181818)
(42, 303.5)
(48, 281.4)
(50, 254.6)
(32, 207.9090909090909)
(58, 116.54545454545455)
(64, 281.3333333333333)
(52, 340.6363636363636)
(24, 233.8)
(20, 165.0)
(62, 220.76923076923077)
(44, 282.1666666666667)
(28, 209.1)
(66, 276.44444444444446)
(46, 223.69230769230768)
(18, 343.375)
(34, 245.5)
(33, 325.3333333333333)
(55, 295.53846153846155)
(59, 220.0)
(37, 249.33333333333334)
(27, 228.125)
(53, 222.85714285714286)
(57, 258.8333333333333)
(43, 230.57142857142858)
(35, 211.625)
(45, 309.53846153846155)
(67, 214.625)
(19, 213.27272727272728)
(51, 302.14285714285717)
(25, 197.45454545454547)
(21, 350.875)
(49, 184.66666666666666)
(39, 169.28571428571428)
(31, 267.25)
(41, 268.55555555555554)
(69, 235.2)
(65, 298.2)
(61, 256.22222222222223)
(29, 215.91666666666666)
(47, 233