In [9]:
#!pip install pyspark



In [3]:
from pyspark import SparkConf, SparkContext
import collections

sc = SparkContext()
rdd = sc.parallelize([3,4,56,7,4,2])

sq = rdd.map(lambda x: x*x)
print(sq.collect())

sc.stop()

[9, 16, 3136, 49, 16, 4]


## Ratings Histograms

In [12]:
from pyspark import SparkConf, SparkContext
import collections

conf=SparkConf().setMaster('local').setAppName("RatingsHistogram")
sc=SparkContext(conf=conf)

lines=sc.textFile('u.data')
ratings=lines.map(lambda x:x.split()[2])
result=ratings.countByValue()
sortedResults=collections.OrderedDict(sorted(result.items()))
for key,value in sortedResults.items():
  print(key,value)

sc.stop()

1 6111
2 11370
3 27145
4 34174
5 21203


In [11]:
sc.stop()

In [19]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

sc.stop()

ITE00100554	5.36F
EZE00100082	7.70F


In [18]:
sc.stop()

In [20]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

sc.stop()

ITE00100554	90.14F
EZE00100082	90.14F


In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("BookWordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("book.txt")
words = input.flatMap(lambda x: x.split())
wordCounts = words. countByValue()

for word, count in wordCounts.items():
  print(word,count)

sc.stop()

In [22]:
sc.stop()

In [None]:
import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("book.txt")
words = input.flatMap(normalizeWords)

wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(False)
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)

sc.stop()

In [24]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("PopularHero")
sc = SparkContext(conf = conf)

def countCoOccurences(line):
    elements = line.split()
    return (int(elements[0]), len(elements) - 1)

def parseNames(line):
    fields = line.split('\"')
    return (int(fields[0]), fields[1].encode("utf8"))

names = sc.textFile("Marvel-names.txt")
namesRdd = names.map(parseNames)

lines = sc.textFile("Marvel-graph.txt")

pairings = lines.map(countCoOccurences)
totalFriendsByCharacter = pairings.reduceByKey(lambda x, y : x + y)
flipped = totalFriendsByCharacter.map(lambda xy : (xy[1], xy[0]))

mostPopular = flipped.max()

mostPopularName = namesRdd.lookup(mostPopular[1])[0]

print(str(mostPopularName) + " is the most popular superhero, with " + str(mostPopular[0]) + \
      " co-appearances.")

sc.stop()

b'CAPTAIN AMERICA' is the most popular superhero, with 1933 co-appearances.


In [29]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Step 1: Initialize Spark
spark = SparkSession.builder.appName("PimaIndianClassification").getOrCreate()

# Step 2: Load the dataset
data = spark.read.csv("pima-indians-diabetes.csv", inferSchema=True, header=True)

# Step 3: Prepare the data for training
feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data).select("features", "Outcome")

# Step 4: Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Step 5: Train a logistic regression model
lr = LogisticRegression(labelCol="Outcome", featuresCol="features")
model = lr.fit(train_data)

# Step 6: Make predictions on the test data
predictions = model.transform(test_data)

# Step 7: Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="Outcome")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.8546265328874024
