# Apache Spark and Big Data

In [None]:
pip install pyspark



In [None]:
## access to drive files
# import pandas as pd
# df = pd.read_csv('/content/drive/MyDrive/projects/ai/big-data/restaurant.csv' )
# df

In [None]:
from pyspark import SparkConf, SparkContext
import collections

sc = SparkContext()
try:
  rdd = sc.parallelize([3,4,56,7,4,2])
  sq = rdd.map(lambda x:x*x)
  print(sq.collect())
except Exception as e:
  print('error', e)
finally:
  sc.stop()


[9, 16, 3136, 49, 16, 4]


## Ratings Histogram

## Read u.data file - 100.000 lines data

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('rating-histogram')
sc= SparkContext(conf=conf)
try:
  lines = sc.textFile('u.data')
  ratings = lines.map(lambda x:x.split()[2])
  result = ratings.countByValue()
  print(result)
  sortedResults = collections.OrderedDict(sorted(result.items()))

  print(sortedResults)
  for key,value in sortedResults.items():
    print(key, value)
except Exception as e:
  print("error", e)
finally:
  sc.stop()

defaultdict(<class 'int'>, {'5': 21203, '1': 6111, '3': 27145, '2': 11370, '4': 34174})
OrderedDict([('1', 6111), ('2', 11370), ('3', 27145), ('4', 34174), ('5', 21203)])
1 6111
2 11370
3 27145
4 34174
5 21203


## Collect Min Max Temperature in Stations

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('min-temperature')
sc= SparkContext(conf=conf)
print("sc started")

def parse_line(line):
  fields = line.split(',')
  station_id = fields[0]
  entry_type = fields[2]
  temperature = float(fields[3])*0.1*(9.0/5.0)+32
  return (station_id, entry_type, temperature)

try:
  lines = sc.textFile('1800.csv')
  #print(lines[3])
  parsed_lines = lines.map(parse_line)
  min_temps = parsed_lines.filter(lambda x: 'TMIN' in x[1])
  station_temps= min_temps.map(lambda x: (x[0], x[2]))
  min_temps = station_temps.reduceByKey(lambda x,y: min(x,y))
  results = min_temps.collect()
  print(results)
except Exception as e:
  print("error", e)
finally:
  sc.stop()
  print("sc stopped")

sc started
[('ITE00100554', 5.359999999999999), ('EZE00100082', 7.699999999999999)]
sc stopped


In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('max-temperature')
sc= SparkContext(conf=conf)
print("sc started")

def parse_line(line):
  fields = line.split(',')
  station_id = fields[0]
  entry_type = fields[2]
  temperature = float(fields[3])*0.1*(9.0/5.0)+32
  return (station_id, entry_type, temperature)

try:
  lines = sc.textFile('1800.csv')
  #print(lines[3])
  parsed_lines = lines.map(parse_line)
  min_temps = parsed_lines.filter(lambda x: 'TMAX' in x[1])
  station_temps= min_temps.map(lambda x: (x[0], x[2]))
  min_temps = station_temps.reduceByKey(lambda x,y: min(x,y))
  results = min_temps.collect()
  print(results)
except Exception as e:
  print("error", e)
finally:
  sc.stop()
  print("sc stopped")

sc started
[('ITE00100554', 18.5), ('EZE00100082', 16.52)]
sc stopped


## Most common words in book

In [None]:
from pyspark import SparkConf, SparkContext
import re

conf = SparkConf().setMaster('local').setAppName('word-count')
sc = SparkContext(conf=conf)
print("sc started")

def normalize_words(text):
  return re.compile(r'\W+', re.UNICODE).split(text.lower())

try:
  input = sc.textFile('book.txt')
  words = input.flatMap(normalize_words)

  word_counts = words.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
  sorted_word_counts = word_counts.map(lambda x: (x[1], x[0])).sortByKey(False)
  results = sorted_word_counts.collect()

  for r in results:
    c = str(r[0])
    w = r[1].encode('ascii', 'ignore')
    if (w):
      print(w.decode()+ ":\t\t"+ c)

  # for w,c in word_counts.items():
    # print(w,c)

except Exception as e:
  print("error", e)
finally:
  sc.stop()
  print("sc stopped")

sc started
you:		1878
to:		1828
your:		1420
the:		1292
a:		1191
of:		970
and:		934
that:		747
it:		649
in:		616
is:		560
for:		537
on:		428
are:		424
if:		411
s:		391
i:		387
business:		383
can:		376
be:		369
as:		343
have:		321
with:		315
t:		301
this:		280
or:		278
time:		255
but:		242
they:		234
will:		231
what:		229
at:		220
my:		215
re:		214
do:		207
not:		203
about:		202
more:		200
product:		182
an:		178
up:		177
need:		174
them:		166
from:		166
how:		163
there:		162
out:		161
new:		153
people:		145
work:		144
so:		143
just:		142
own:		140
all:		137
don:		133
get:		123
customers:		123
by:		122
want:		122
company:		122
their:		122
some:		121
ll:		114
self:		111
website:		109
make:		108
may:		107
even:		104
when:		102
one:		100
ve:		95
than:		92
also:		91
job:		90
much:		90
who:		88
money:		86
was:		85
these:		82
find:		81
sales:		80
only:		79
into:		79
yourself:		78
other:		78
like:		78
no:		76
probably:		76
employment:		75
ads:		75
day:		73
good:		72
many:		71
before:		70
most:		

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("Wordcount")
sc = SparkContext(conf = conf)

input=sc.textFile('book.txt')
words=input.flatMap(lambda x: x.split())
wordCounts=words.countByValue()

for word, count in wordCounts.items():
  print(word, count)

sc.stop()

you:		1878
to:		1828
your:		1420
the:		1292
a:		1191
of:		970
and:		934
that:		747
it:		649
in:		616
is:		560
for:		537
on:		428
are:		424
if:		411
s:		391
i:		387
business:		383
can:		376
be:		369
as:		343
have:		321
with:		315
t:		301
this:		280
or:		278
time:		255
but:		242
they:		234
will:		231
what:		229
at:		220
my:		215
re:		214
do:		207
not:		203
about:		202
more:		200
product:		182
an:		178
up:		177
need:		174
them:		166
from:		166
how:		163
there:		162
out:		161
new:		153
people:		145
work:		144
so:		143
just:		142
own:		140
all:		137
don:		133
get:		123
customers:		123
by:		122
want:		122
company:		122
their:		122
some:		121
ll:		114
self:		111
website:		109
make:		108
may:		107
even:		104
when:		102
one:		100
ve:		95
than:		92
also:		91
job:		90
much:		90
who:		88
money:		86
was:		85
these:		82
find:		81
sales:		80
only:		79
into:		79
yourself:		78
other:		78
like:		78
no:		76
probably:		76
employment:		75
ads:		75
day:		73
good:		72
many:		71
before:		70
most:		70
might:		

In [None]:
from pyspark import SparkConf, SparkContext
import re

conf = SparkConf().setMaster('local').setAppName('popular-hero')
sc = SparkContext(conf=conf)
print("sc started")

def count_co_occurences(line):
  elements = line.split()
  return (int(elements[0]), len(elements) - 1)

def parse_names(line):
  fields = line.split('\"')
  return (int(fields[0]), fields[1].encode("utf8"))

try:
  names = sc.textFile('Marvel-names.txt')
  names_rdd = names.map(parse_names)

  lines = sc.textFile('Marvel-graph.txt')

  pairings = lines.map(count_co_occurences)
  total_friends_by_chracters = pairings.reduceByKey(lambda x,y: x+y)
  flipped = total_friends_by_chracters.map(lambda xy: (xy[1], xy[0]))

  most_popular = flipped.max()
  most_popular_name = names_rdd.lookup(most_popular[1])[0]
  print(str(most_popular_name)+ " is the most popular superhero, with ", str(most_popular[0])+ " co-appearances.")

except Exception as e:
  print("error", e)
finally:
  sc.stop()
  print("sc stopped")

sc started
b'CAPTAIN AMERICA' is the most popular superhero, with  1933 co-appearances.
sc stopped


## Machine Learning with Spark

In [None]:
#Step 0: Import Libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#Step 1: Initialize Spark
spark = SparkSession.builder.appName("pima-indian-classifier").getOrCreate()
print("spark started")


try:
  # Step 2: Load the dataset
  data = spark.read.csv('pima-indians-diabetes.csv', header=True, inferSchema=True)

  # Step 3: Prepare the data for training
  predictors = data.columns[:-1]
  assembler = VectorAssembler(inputCols=predictors, outputCol='features')
  data = assembler.transform(data).select('features', 'Outcome')

  # Step 4: Split the data into training and testing sets
  train_data, test_data = data.randomSplit([0.7,0.3], seed=42)

  # Step 5: Train a logistic regression model
  lr = LogisticRegression(labelCol='Outcome', featuresCol='features')
  model=lr.fit(train_data)

  # Step 6: Make predictions on the test data
  predictions = model.transform(test_data)

  # Step 7: Evaluate the model
  evaluator = BinaryClassificationEvaluator(labelCol='Outcome')
  accuracy = evaluator.evaluate(predictions)
  print('Accuracy: ',accuracy )


except Exception as e:
  print("error", e)
finally:
  spark.stop()
  print("spark stopped")

spark started
Accuracy:  0.8546265328874024
spark stopped
