In [67]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


In [68]:
spark

In [69]:
# Reading from text file

df_text = spark.read.text("data/bodies.txt")

df_text.head(4)

[Row(value='182 86 M'),
 Row(value='193 112 M'),
 Row(value='172 72 M'),
 Row(value='170 61 F')]

In [70]:
# Reading from CSV file

df_csv = spark.read.csv("data/iris.csv")

df_csv.head(4)

[Row(_c0='sepal_length', _c1='sepal_width', _c2='petal_length', _c3='petal_width', _c4='species'),
 Row(_c0='5.1', _c1='3.5', _c2='1.4', _c3='0.2', _c4='setosa'),
 Row(_c0='4.9', _c1='3', _c2='1.4', _c3='0.2', _c4='setosa'),
 Row(_c0='4.7', _c1='3.2', _c2='1.3', _c3='0.2', _c4='setosa')]

In [71]:
# Reading From URL

url = 'https://raw.githubusercontent.com/ramashanker/dataset/master/climate/India/India_Sub_Division_IMD_2017.csv'
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
df_url = spark.read.csv("file://"+SparkFiles.get("India_Sub_Division_IMD_2017.csv"), header=True, inferSchema= True)
df_url.head(5)

[Row(SUBDIVISION='Andaman & Nicobar Islands', YEAR=1901, JAN='49.2', FEB='87.1', MAR='29.2', APR='2.3', MAY='528.8', JUN='517.5', JUL='365.1', AUG='481.1', SEP='332.6', OCT='388.5', NOV='558.2', DEC='33.6', ANNUAL='3373.2', JF='136.3', MAM='560.3', JJAS='1696.3', OND='980.3'),
 Row(SUBDIVISION='Andaman & Nicobar Islands', YEAR=1902, JAN='0', FEB='159.8', MAR='12.2', APR='0', MAY='446.1', JUN='537.1', JUL='228.9', AUG='753.7', SEP='666.2', OCT='197.2', NOV='359', DEC='160.5', ANNUAL='3520.7', JF='159.8', MAM='458.3', JJAS='2185.9', OND='716.7'),
 Row(SUBDIVISION='Andaman & Nicobar Islands', YEAR=1903, JAN='12.7', FEB='144', MAR='0', APR='1', MAY='235.1', JUN='479.9', JUL='728.4', AUG='326.7', SEP='339', OCT='181.2', NOV='284.4', DEC='225', ANNUAL='2957.4', JF='156.7', MAM='236.1', JJAS='1874', OND='690.6'),
 Row(SUBDIVISION='Andaman & Nicobar Islands', YEAR=1904, JAN='9.4', FEB='14.7', MAR='0', APR='202.4', MAY='304.5', JUN='495.1', JUL='502', AUG='160.1', SEP='820.4', OCT='222.2', NOV=

In [72]:
import numpy as np
import pandas as pd
# Generate Data
gender_category = ['M', 'F']
gender_probs = [0.4, 0.6]
def generate_dataset():
    gender = np.random.choice(gender_category, p=gender_probs)
    if gender == 'M':
        height= np.random.normal(loc=140, scale=15, size=1);
        weight = np.random.normal(loc=90, scale=10, size=1);
        gender = 'Male'
    else:
        height = np.random.normal(loc=195, scale=10, size=1);
        weight = np.random.normal(loc=60, scale=5, size=1);
        gender = 'Female'
    return int(height[0]),int(weight[0]),gender

samples = pd.DataFrame([generate_dataset() for _ in range(50)],
                  columns=['height','weight','gender'])
samples.to_csv("data/population.csv")
spark_sample = spark.createDataFrame(samples)


In [73]:
#Write dataframe to csv by default its a part
#spark_sample.write.csv('data/part/Normalosa.csv')

In [74]:
# Write a Single file using Spark coalesce() & repartition()
#spark_sample.coalesce(1).write.csv("data/Normalosa.csv")
#spark_sample.repartition(1).write.csv("data/Normalosa.csv")

# Actions

In [76]:
df_population = spark.read.text("data/population.csv")
df_population.show()

+--------------------+
|               value|
+--------------------+
|,height,weight,ge...|
|       0,124,71,Male|
|     1,198,53,Female|
|     2,197,57,Female|
|     3,188,61,Female|
|       4,134,88,Male|
|       5,112,85,Male|
|     6,197,64,Female|
|       7,130,94,Male|
|     8,203,56,Female|
|       9,139,85,Male|
|    10,197,59,Female|
|    11,196,55,Female|
|      12,128,90,Male|
|      13,113,64,Male|
|    14,197,57,Female|
|      15,139,98,Male|
|    16,184,49,Female|
|      17,166,91,Male|
|    18,205,62,Female|
+--------------------+
only showing top 20 rows



In [93]:
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

# reduce
#Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

In [94]:
from operator import add
from operator import mul
listRdd.reduce(add)
listRdd.reduce(mul)

720

# Collect
#Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

In [86]:
listRdd.collect()

[1, 2, 3, 4, 5, 3, 2]

# count:Return the number of elements in the dataset.

In [87]:
listRdd.count()

7

# first: Return the first element of the dataset (similar to take(1)).

In [88]:
listRdd.first()

1

# take(n):Return an array with the first n elements of the dataset.

In [89]:
listRdd.take(3)

[1, 2, 3]

# takeSample(withReplacement, num, [seed]):Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

#takeOrdered(n, [ordering]): Return the first n elements of the RDD using either their natural order or a custom comparator.

# saveAsTextFile(path)

countByKey()

In [92]:
rdd = spark.parallelize([("a", 2), ("b", 1), ("a", 1),("b", 3)])
sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]

AttributeError: 'SparkSession' object has no attribute 'parallelize'

foreach(func)