In [1]:
#Palindrome words

#Library to download web pages
import requests

file_content = requests.get('https://raw.githubusercontent.com/forons/BigDataExamples/master/files/data.txt').iter_lines()

def Palindrome(s):
  return s == s[::-1]

def reverse(s):
  rev = ''
  for i in s: 
    rev = i + rev
  return rev

words = sc.parallelize(file_content)
#palindroms = words.filter(lambda x: Palindrome(x))
palindroms = words.filter(lambda x: x == reverse(x))
palindroms.collect()

In [2]:
#Words that occur exactly 5 times
import requests
from operator import add

#retrieve the file from the source
file_content = requests.get('https://raw.githubusercontent.com/forons/BigDataExamples/master/files/inferno.txt').iter_lines()
#parallelize the content of the file into the variable lines
lines = sc.parallelize(file_content)

#
words = lines.flatMap(lambda x: x.split(' '))

word_pairs = words.map(lambda x: (x,1))

word_count = word_pairs.reduceByKey(add)

#word_count.collect()

word_count.filter(lambda x: x[1] == 5).collect()



In [3]:
#Group By Occurrences
result = word_count.groupBy(lambda x: x[1]).collect()

separator = '#####'

for key, val in result:
  print(separator)
  for elem in val:
    print(elem,key)

In [4]:
path = '/FileStore/tables/tweets_cleaned.csv'

df = spark.read.csv(path, header = 'true')

df.printSchema()

#how many times a person twetted
counts = df.groupBy('user_name').count()
counts.printSchema()

from pyspark.sql.functions import desc

counts.sort(desc('count')).show()

In [5]:
rdd = sc.textFile('FileStore/tables/tweets_cleaned.csv')
rdd.collect()

In [6]:
df = spark.read.csv('FileStore/tables/people2.csv', inferSchema = 'true', header = 'true', nullValue = 'null')
#infer permits that the number is interpreted as number and not as a string

df.show()
df.printSchema()

#select "name" column
df.select("name").show()

#select everybody but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

#filter people older than 21
df.filter(df['age'] > 21).show()


In [7]:
from pyspark.sql import Row

# Load a text file and convert each line to a Row.
lines = sc.textFile("/FileStore/tables/people.txt")

parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=p[1]))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

teenagers = spark.sql("SELECT name FROM people_rdd WHERE age >= 13 AND age <= 19")
teenagers.show()

#The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

In [8]:
# Import data types
from pyspark.sql.types import *

# Load a text file and convert each line to a Row.
lines = sc.textFile("/FileStore/tables/people.txt")
parts = lines.map(lambda l: l.split(","))

# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip(), p[2], p[3]))

# The schema is encoded in a string.
schemaString = "name age city country"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)


# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()



In [9]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)

lines = ssc.socketTextStream("dkm3.disi.unitn.it", 9999)

words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word,1))
wordCounts = pairs.ReduceByKey(lambda x, y : x + y)

wordCounts.pprint()

ssc.start()
ssc.awaitTermination()


In [10]:
from pyspark.sql.functions import *

#1. Find average of grades for each student
course_name_credits = spark.read.csv('/FileStore/tables/course_name_credits.csv', inferSchema = 'true', header = 'true', nullValue = 'null')
course_name_grade = spark.read.csv('/FileStore/tables/student_course_grade.csv', inferSchema = 'true', header = 'true', nullValue = 'null')
student_name = spark.read.csv('/FileStore/tables/student_name.tsv', sep='\t', inferSchema = 'true', header = 'true')

query = course_name_credits.join(course_name_grade, ["course"]).join(student_name, ["student"])
query.groupBy('name').avg('grade').show()


#2. Find out the maximum grade for each course, displaying its name
courses = course_name_credits.join(course_name_grade, ["course"])
max_grade = query.groupBy('course_name').max('grade')
#max_grade.show()


#3. Decrease Marks by 4 and create new column by adding PASS or FAIL if >=18
course_name_grade = course_name_grade.withColumn('minus4', course_name_grade['grade']-4)
course_name_grade = course_name_grade.withColumn('passed', when(course_name_grade.minus4 >= 18, 'PASS').otherwise('FAIL'))
course_name_grade.show()
#---or----
#course_name_grade = course_name_grade.withColumn('passed', course_name_grade['grade']>=18)

#4. Save into files: csv, json, parquet
course_name_grade.write.csv('/FileStore/tables/output_test_csv')
course_name_grade.write.json('/FileStore/tables/output_test_json')
course_name_grade.write.parquet("/FileStore/tables/output_test_parquet")




In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

#1) Count the number of people that have the same age
adult_data = spark.read.csv('/FileStore/tables/adult.data', inferSchema = 'true', header = 'true', nullValue = 'null')

group_by_age = adult_data.groupBy('age').count()
group_by_age.show()

#2) Average age by marital-status
group_by_marital = adult_data.groupBy('marital-status').avg('age')
group_by_marital.show()

#3) Maximum capital-gain by country
capital_gain = adult_data.groupBy('native-country').max('capital-gain')
capital_gain.show()

In [13]:
'''
4) Perform classification for predicting the 'class' column
  a. Create a vector of features of all the numerical columns ('fnlwgt', 'age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week') through the VectorAssembler
  b. Create an "numerical conversion" of the 'class' column with the StringIndexer
  c. Split the dataset in train and test set
  d. Apply RandomForestClassifier algorithm for predicting the numerical conversion of the ‘class’ column
  e. Convert the prediction again into string with the IndexToString
  f. Evaluate the model with the MulticlassClassificationEvaluator
'''
# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

train = spark.read.csv('/FileStore/tables/adult.data', inferSchema = 'true', header = 'true', nullValue = 'null')
# dropping null values
train = train.dropna()
(traindf, testdf) = train.randomSplit([0.7,0.3])

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
train = StringIndexer(inputCol="class", outputCol="indexedClass").fit(train).transform(train) 
 
# One Hot Encoder on indexed features
train = OneHotEncoder(inputCol="indexedClass", outputCol="classVec").transform(train)
 
# Feature assembler as a vector
train = VectorAssembler(inputCols=["age", "fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"],outputCol="features").transform(train)
 
rf = RandomForestClassifier(labelCol="indexedClass", featuresCol="features", numTrees = 10)
 
model = rf.fit(train)
 
predictions = model.transform(train)

print("StringIndexer will store labels in output column metadata\n")

#Conversion of the predicted indexed class to the predicted indexed label
converter = IndexToString(inputCol="indexedClass", outputCol="predictedClass")
converted = converter.transform(predictions)
converted.select(col("predictedClass"),col("class"),col("probability"),).show(20)

evaluator = MulticlassClassificationEvaluator(labelCol="indexedClass", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(converted)

print("Test Error = %g" % (1.0 - accuracy))


In [14]:
# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
 
train = spark.read.csv('/FileStore/tables/adult.data', inferSchema = 'true', header = 'true', nullValue = 'null')
# dropping null values
train = train.dropna()
(traindf, testdf) = train.randomSplit([0.7,0.3])

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
classIndexer = StringIndexer(inputCol="class", outputCol="indexedClass")
 
# One Hot Encoder on indexed features
classEncoder = OneHotEncoder(inputCol="indexedClass", outputCol="classVec")
 
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["age", "fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"],outputCol="features")
 
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedClass", featuresCol="features")
 
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[classIndexer, classEncoder, assembler, rf]) # classIndexer, classEncoder, assembler, rf
 
# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)
 
# Predictions
predictions = model.transform(testdf)

#Conversion of the predicted indexed class to the predicted indexed label
converter = IndexToString(inputCol="indexedClass", outputCol="predictedClass")
converted = converter.transform(predictions)
converted.select(col("predictedClass"),col("class"),col("probability"),).show(20)

evaluator = MulticlassClassificationEvaluator(labelCol="indexedClass", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(converted)

print("Test Error = %g" % (1.0 - accuracy))

In [15]:
data = spark.read.csv('/FileStore/tables/leukemia.dat', inferSchema = 'true', header = 'true', nullValue = 'null')
(traindf, testdf) = data.randomSplit([0.8,0.2])

#traindf.write.dat('/FileStore/tables/data_train')
traindf.write.csv('/FileStore/tables/data_train')
testdf.write.csv('/FileStore/tables/data_test')

#testdf.write.dat('/FileStore/tables/data_test')


