In [2]:
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a SparkSession. No need to create SparkContext
# You automatically get it as part of the SparkSession
spark = SparkSession.builder.master("local[*]").appName("ETL").getOrCreate()

#     .config("spark.some.config.option", "some-value") \

# now we can go to http://localhost:4040 (default port) in order to see Spark's web UI

In [None]:
# Configuring Spark’s Runtime Properties
esto es java o scala

//set new runtime options
spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()


In [None]:
#Creating Datasets and Dataframes

#create a Dataset using spark.range starting from 5 to 100, with increments of 5
val numDS = spark.range(5, 100, 5)
# reverse the order and display first 5 items
numDS.orderBy(desc("id")).show(5)
#compute descriptive stats and display them
numDs.describe().show()
# create a DataFrame using spark.createDataFrame from a List or Seq
val langPercentDF = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))
#rename the columns
val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
#order the DataFrame in descending order of percentage
lpDF.orderBy(desc("percent")).show(false)



In [None]:
# read the json file and create the dataframe
val jsonFile = args(0)
val zipsDF = spark.read.json(jsonFile)
# filter all cities whose population > 40K
zipsDF.filter(zipsDF.col("pop") > 40000).show(10)


In [54]:
#datasetDir = "../../datasets/" # local files
datasetDir = "hdfs://localhost:19000/" # hadoop filesystem

file = datasetDir + "HiggsTwitter/higgs-social_network.edgelist.gz"
schema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])
socialDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-retweet_network.edgelist.gz"
schema = StructType([StructField("tweeter", IntegerType()), StructField("tweeted", IntegerType()), StructField("occur", IntegerType())])
retweetDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-reply_network.edgelist.gz"
schema = StructType([StructField("replier", IntegerType()), StructField("replied", IntegerType()), StructField("occur", IntegerType())])
replyDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-mention_network.edgelist.gz"
schema = StructType([StructField("mentioner", IntegerType()), StructField("mentioned", IntegerType()), StructField("occur", IntegerType())])
mentionDF = spark.read.csv(path=file, sep=" ", schema=schema)

file = datasetDir + "HiggsTwitter/higgs-activity_time.txt.gz"
schema = StructType([StructField("userA", IntegerType()), \
                     StructField("userB", IntegerType()), \
                     StructField("timestamp", IntegerType()), \
                    StructField("interaction", StringType())])
                    #Interaction can be RT (retweet), MT (mention) or RE (reply)
activityDF = spark.read.csv(path=file, sep=" ", schema=schema)


In [55]:
socialDF.printSchema()

socialDF.schema

root
 |-- follower: integer (nullable = true)
 |-- followed: integer (nullable = true)



StructType(List(StructField(follower,IntegerType,true),StructField(followed,IntegerType,true)))

In [56]:
%%time
socialDF.show(3)
retweetDF.show(3)
replyDF.show(3)
mentionDF.show(3)
activityDF.show(3)


+--------+--------+
|follower|followed|
+--------+--------+
|       1|       2|
|       1|       3|
|       1|       4|
+--------+--------+
only showing top 3 rows

+-------+-------+-----+
|tweeter|tweeted|occur|
+-------+-------+-----+
| 298960| 105232|    1|
|  95688|   3393|    1|
| 353237|  62217|    1|
+-------+-------+-----+
only showing top 3 rows

+-------+-------+-----+
|replier|replied|occur|
+-------+-------+-----+
| 161345|   8614|    1|
| 428368|  11792|    1|
|  77904|  10701|    1|
+-------+-------+-----+
only showing top 3 rows

+---------+---------+-----+
|mentioner|mentioned|occur|
+---------+---------+-----+
|   316609|     5011|    1|
|   439696|    12389|    1|
|    60059|     6929|    1|
+---------+---------+-----+
only showing top 3 rows

+------+------+----------+-----------+
| userA| userB| timestamp|interaction|
+------+------+----------+-----------+
|223789|213163|1341100972|         MT|
|223789|213163|1341100972|         RE|
|376989| 50329|1341101181|       

## Spark SQL using DataFrame API

In [46]:
%%time

# User who follows most users
socialDF.groupBy("follower").agg(count("followed").alias("follows")).orderBy(desc("follows")).show(5)

# User who has most followers
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)


+--------+-------+
|follower|follows|
+--------+-------+
|   13115|   1259|
|   49180|   1155|
|   50338|   1097|
|    1984|   1093|
|    3628|   1027|
+--------+-------+
only showing top 5 rows

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 20.7 s


## Spark SQL using SQL language

In [12]:
%%time
socialDF.count()

Wall time: 7.16 s


14855842

In [43]:
%%time
#socialDF.createOrReplaceTempView("social")

# User who follows most users
spark.sql("select follower, count(followed) as follows from social group by follower order by count(followed) desc").show(5)

# User who has most followers
spark.sql("select followed, count(follower) as followers from social group by followed order by count(follower) desc").show(5)


+--------+-------+
|follower|follows|
+--------+-------+
|   13115|   1259|
|   49180|   1155|
|   50338|   1097|
|    1984|   1093|
|    3628|   1027|
+--------+-------+
only showing top 5 rows

+--------+---------+
|followed|followers|
+--------+---------+
|    1503|    51386|
|     206|    48414|
|      88|    45221|
|     138|    44188|
|    1062|    40120|
+--------+---------+
only showing top 5 rows

Wall time: 20.4 s


In [None]:
import os, sys
from pyspark import SparkContext, SparkConf, SQLContext
appName = "miApp"
master="local[*]"

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf) # start the SparkContext

# now we can go to http://localhost:4040 (default port) in order to see Spark's web UI

In [None]:
#print(os.getcwd())
textFile = sc.textFile("../../datasets/worldbank-millennium-development-goals.csv")

# datasets at:
#https://datasource.kapsarc.org/explore/dataset/worldbank-millennium-development-goals/export/
#https://datasource.kapsarc.org/explore/dataset/worldbank-health-nutrition-and-population-statistics/export/

In [None]:
#print(os.getcwd())
fSocial = sc.textFile("../../datasets/HiggsTwitter/higgs-social_network.edgelist.gz")
fRetweet = sc.textFile("../../datasets/HiggsTwitter/higgs-retweet_network.edgelist.gz")
fReply = sc.textFile("../../datasets/HiggsTwitter/higgs-reply_network.edgelist.gz")
fMention = sc.textFile("../../datasets/HiggsTwitter/higgs-mention_network.edgelist.gz")
fActivity = sc.textFile("../../datasets/HiggsTwitter/higgs-activity_time.txt.gz")

# datasets at http://snap.stanford.edu/data/higgs-twitter.html

In [None]:
print(fSocial.take(3))
print(fRetweet.take(3))
print(fReply.take(3))
print(fMention.take(3))
print(fActivity.take(3))

In [None]:
print(fSocial.take(40))


In [None]:
social = sqlContext.load(path = 'PATH/train.csv', header = True,inferSchema = True)
test = sqlContext.load(source="com.databricks.spark.csv", path = 'PATH/test-comb.csv', header = True,inferSchema = True)



In [None]:
sc.createDataFrame(fSocial)


# RDD


In [None]:
print(textFile.count()) # Number of items in this RDD

textFile.first()  # First item in this RDD

In [None]:
textFile.filter(lambda line: "Spain" in line).count()  # Lines containing "Spain"

In [None]:
# find the line with the most words (lambda format)
textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

In [None]:
# find the line with the most words (function format)
def max(a, b):
     if a > b:
         return a
     else:
         return b


textFile.map(lambda line: len(line.split())).reduce(max)

In [None]:
# One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

In [None]:
wordCounts.take(10)
#wordCounts.collect()

In [None]:
#textFile.first() # returns first item
textFile.take(3)

In [None]:
textFile.map(lambda x: (x.split(';'))).take(3) # returns 1 RDD containing 3 lists

In [None]:
textFile.flatMap(lambda x: (x.split(';'))).take(8) # returns 8 separated values

In [None]:
# ejemplo que funciona:
fieldcount = textFile.flatMap(lambda line: line.split(';')).map(lambda field: (field, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.take(10)

In [None]:
#get Year
years = lines.map(lambda x: (x[0].split(';')[4], 1))
years.take(5)
#counts = years.reduceByKey(lambda a, b: a + b)

#print(counts.collect()) # TODO: fix it



In [None]:
#EXAMPLE
pairs = people.map(lambda x: (x[4], 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
print(counts.collect())

# DataFrame

# Dataset