# Learning PySpark

## Creating RDDs

In [1]:
from pyspark import SparkContext 
from pyspark import SQLContext
import sys
import os

In [2]:
# public SparkContext(String master,
#                     String appName,
#                     String sparkHome,
#                     scala.collection.Seq<String> jars,
#                     scala.collection.Map<String,String> environment)

## build my first app 
sc = SparkContext(appName ='First Lesson', master = 'local')
sc

In [3]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [6]:
swimmersJSON = sqlContext.read.json(stringJSONRDD)

In [7]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [3]:
sc.appName

'First Lesson'

In [4]:
## build the sqlContext 
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x11f2f1bd0>

In [6]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
     ('Amber', 9)])
data.collect()

[('Amber', 22), ('Alfred', 23), ('Skye', 4), ('Albert', 12), ('Amber', 9)]

In [13]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]])
rdd0 = data_heterogenous.collect()
rdd0
print(type(rdd0), rdd0)

<class 'list'> [('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]


In [24]:
data_heterogenous.count()

3

In [14]:
print(rdd0[2][1], rdd0[1]['Porsche'])

visited 100000


### read textFile locally

In [21]:
# when you read a local data, you should add 'file:///' at the beginning of the path
path ='file:///Users/tjmask/Desktop/Semester2/Spark/PySpark/Datasets/VS14MORT.txt'
data_from_file = sc.textFile(path)

In [26]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

In [27]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]
    
    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs

In [28]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [29]:
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)

[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

In [30]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)

[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]

In [31]:
data_from_file_conv.take(1)[0][16]

'2014'

In [32]:
data_from_file_conv.take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [36]:
data_filtered = data_from_file_conv.filter(lambda row: (row[5] == 'F' & row[21] == '0'))
### I guess it's because file is so large?
# data_filtered.count()
# data_filtered.collect()
# data_filtered.take(1)

In [37]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

In [22]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16])+2))
data_2014_flat.take(10)

['2014', 2016, '2014', 2016, '2014', 2016, '2014', 2016, '2014', 2016]

In [32]:
from time import *
start_time = time()
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
end_time = time()
run_time = end_time - start_time
print(run_time, distinct_gender)

91.19847989082336 ['-99', 'M', 'F']


### sample

In [44]:
fraction = 0.0001
data_sample = data_from_file_conv.sample(False, fraction,666)
data_sample.take(1)

[array(['1', '16', ' ', '0', '04', 'F', '1', '076', ' ', '41', '21', '10',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I469',
        '228', '068', '   ', '22', '01', '11I469 ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I469 ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [45]:
data_sample.count()

264

In [None]:
## so giant!! why so slow?!
# print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

In [None]:
### take fixed number of samples
# data_take_sampled = data_from_file_conv.takeSample(False, 2, seed=123)
# data_take_sampled

### Join

In [46]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()

[('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]

In [47]:
print(rdd1.collect(),'======', rdd2.collect())



In [48]:
rdd4 = rdd1.rightOuterJoin(rdd2)
rdd4.collect()

[('b', (4, '6')), ('d', (None, 15)), ('a', (1, 4)), ('a', (1, 1))]

In [49]:
rdd5 = rdd1.join(rdd2)
rdd5.collect()

[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

In [51]:
rdd6 = rdd1.intersection(rdd2)
rdd6.collect()

[('a', 1)]

### Partitions, sortBykey, reduce, reduceByKey

In [55]:
## repartitions
rdd1 = rdd1.repartition(3)
# len(rdd1.collect())
rdd1.glom().collect()

[[], [], [('a', 1), ('b', 4), ('c', 10)]]

In [57]:
len(rdd1.glom().collect())

3

In [21]:
rdd1.collect()

[('a', 1), ('b', 4), ('c', 10)]

In [23]:
rdd1.map(lambda row: row[1]).collect()

[1, 4, 10]

In [24]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

15

In [28]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x*y)

40

In [59]:
data_reduce = sc.parallelize([1,2,.5,.1,5,.2],2)
data_reduce.collect()

[1, 2, 0.5, 0.1, 5, 0.2]

In [60]:
works = data_reduce.reduce(lambda x, y: x/y)
works

10.0

In [36]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

0.004

In [73]:
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],3)
# data_key.reduceByKey(lambda x, y: x + y).collect()

In [63]:
data_key.collect()

[('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]

In [64]:
data_key.reduceByKey(lambda x, y: x+y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

In [66]:
data_key.reduceByKey(lambda x, y: x+y).sortByKey(lambda x: x).collect()

[('a', 12), ('b', 4), ('c', 2), ('d', 5)]

In [50]:
data_key.countByKey().values()
data_key.countByKey().items()
data_key.countByKey().keys()

dict_keys(['a', 'b', 'c', 'd'])

### saveAsTextFile

In [74]:
path = 'file:///Users/tjmask/Desktop/Semester2/Spark/PySpark/Datasets/'
data_key.saveAsTextFile(path+'data_key')

In [75]:
def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc \
    .textFile(path+'data_key') \
    .map(parseInput)
data_key_reread.collect()

[('d', 2), ('b', 1), ('d', 3), ('c', 2), ('a', 8), ('a', 4), ('b', 3)]

### foreach & foreachPartition??output?

In [80]:
def f(x): print(x)
a = sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
type(a)

NoneType

In [83]:
def f(iterator):
    for x in iterator:
         print(x)
b = sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
type(b)

NoneType

### read jason

In [87]:
from pyspark.sql import SparkSession

# spark = SparkSession.builder\
#                     .master('local[*]')\
#                     .appName('ysh_sql')\
#                     .getOrCreate()


# ## read dataframe
# path = 'Datasets/example_1.json'
# df = spark.read.json(path, multiLine=True)
# df.show()

# # ## DSL 
# # df.printSchema()
# # df.select('color').show()
# # df.select(df['color'].alias('color1'), df['size']+'1').toDF('c1','s1').show()
# # df.filter(df['color']=='Red').show()
# # # df.groupBy('age').count().show()
# # # df.agg({'age':'avg'}).show()

In [84]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

### Create DataFrame, SQL

In [88]:
# Create DataFrame
swimmersJSON = sqlContext.read.json(stringJSONRDD,multiLine=True)

In [89]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [90]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [91]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [95]:
# SQL Query
sqlContext.sql("select age age_new from swimmersJSON").show()

+-------+
|age_new|
+-------+
|     19|
|     22|
|     23|
+-------+



In [96]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [97]:
from pyspark.sql.types import *
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])
# Apply the schema to the RDD and Create DataFrame
swimmers = sqlContext.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [98]:
stringCSVRDD.collect()

[(123, 'Katie', 19, 'brown'),
 (234, 'Michael', 22, 'green'),
 (345, 'Simone', 23, 'blue')]

In [99]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [100]:
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [101]:
# Execute SQL Query and return the data
sqlContext.sql("select * from swimmers").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [102]:
# Get count of rows in SQL
sqlContext.sql("select count(1) as total_row from swimmers").show()

+---------+
|total_row|
+---------+
|        3|
+---------+



In [103]:
sqlContext.sql("select * from swimmers where age>=22").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [107]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age == 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [112]:
swimmers.select('id', 'age', 'name').filter('age>=22').show()
swimmers.select('id', 'age', 'name').where('age==22').show()

+---+---+-------+
| id|age|   name|
+---+---+-------+
|234| 22|Michael|
|345| 23| Simone|
+---+---+-------+

+---+---+-------+
| id|age|   name|
+---+---+-------+
|234| 22|Michael|
+---+---+-------+



In [109]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [110]:
# Query id and age for swimmers with age = 22 in SQL
sqlContext.sql("select id, age from swimmers where age == 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [107]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
sqlContext.sql("select name, eyeColor from swimmers where eyeColor LIKE 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [108]:
# Show the values 
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [113]:
# Using Databricks `display` command to view the data easier
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

In [114]:
# Get count of rows
swimmers.count()

3

In [115]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [121]:
# Set File Paths
flightPerfFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsFilePath = "https://github.com/drabastomek/learningPySpark/blob/master/Chapter03/flight-data/airport-codes-na.txt"

# Obtain Airports dataset
airports = sqlContext.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = sqlContext.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

In [None]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance\ 
          f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City,\
          f.origin order by sum(f.delay) desc").show()

In [None]:
# Query Sum of Flight Delays by State (for the US)
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f \
          join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()

### create DataFrame locally

In [6]:
df0 = sqlContext.createDataFrame([
        ('A', 144.5, 5.9, 33, 'M'),
        ('A', 167.2, 5.4, 45, 'M'),
        ('A', 124.1, 5.2, 23, 'F'),
        ('B', 144.5, 5.9, 33, 'M'),
        ('B', 133.2, 5.7, 54, 'F'),
        ('C', 124.1, 5.2, 23, 'F'),
        ('D', 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])

In [7]:
df0.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  A| 144.5|   5.9| 33|     M|
|  A| 167.2|   5.4| 45|     M|
|  A| 124.1|   5.2| 23|     F|
|  B| 144.5|   5.9| 33|     M|
|  B| 133.2|   5.7| 54|     F|
|  C| 124.1|   5.2| 23|     F|
|  D| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [10]:
df1 = sqlContext.createDataFrame([
        ('A', '1'),
        ('A', '2'),
        ('B', '2'),
        ('B', '3'),
        ('C', '6'),
        ('D', '7'),
    ], ['id', 'test'])

In [11]:
df1.show()

+---+----+
| id|test|
+---+----+
|  A|   1|
|  A|   2|
|  B|   2|
|  B|   3|
|  C|   6|
|  D|   7|
+---+----+



In [13]:
df1.join(df0, 'id', 'inner').show()

+---+----+------+------+---+------+
| id|test|weight|height|age|gender|
+---+----+------+------+---+------+
|  B|   2| 144.5|   5.9| 33|     M|
|  B|   2| 133.2|   5.7| 54|     F|
|  B|   3| 144.5|   5.9| 33|     M|
|  B|   3| 133.2|   5.7| 54|     F|
|  D|   7| 129.2|   5.3| 42|     M|
|  C|   6| 124.1|   5.2| 23|     F|
|  A|   1| 144.5|   5.9| 33|     M|
|  A|   1| 167.2|   5.4| 45|     M|
|  A|   1| 124.1|   5.2| 23|     F|
|  A|   2| 144.5|   5.9| 33|     M|
|  A|   2| 167.2|   5.4| 45|     M|
|  A|   2| 124.1|   5.2| 23|     F|
+---+----+------+------+---+------+



In [131]:
print('Count of rows: {0},\nCount of distinct rows: {1}'.format(df.count(), df.distinct().count()))

Count of rows: 7,
Count of distinct rows: 6


In [125]:
print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))

Count of rows: 7
Count of distinct rows: 6


In [None]:
### dropDuplicates, distinct, select

In [132]:
df = df.dropDuplicates()
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
|  1| 144.5|   5.9| 33|     M|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+



In [142]:
df.select([c for c in df.columns if c!='id']).show()

+------+------+---+------+
|weight|height|age|gender|
+------+------+---+------+
| 133.2|   5.7| 54|     F|
| 129.2|   5.3| 42|     M|
| 144.5|   5.9| 33|     M|
| 144.5|   5.9| 33|     M|
| 167.2|   5.4| 45|     M|
| 124.1|   5.2| 23|     F|
+------+------+---+------+



In [143]:
print('Count of ids: {0}, \nCount of distinct without ids: {1}'\
      .format(df.count(), df.select([c for c in df.columns if c!='id']).distinct().count()))

Count of ids: 6, 
Count of distinct without ids: 5


In [120]:
print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))

Count of ids: 6
Count of distinct ids: 5


In [144]:
df.select([i for i in df.columns if i!='id']).dropDuplicates().count()

5

In [125]:
df = df.dropDuplicates(subset=[i for i in df.columns if i!='id'])
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [152]:
## input needs to be list
df.dropDuplicates(['id']).show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  3| 124.1|   5.2| 23|     F|
|  2| 167.2|   5.4| 45|     M|
|  4| 144.5|   5.9| 33|     M|
+---+------+------+---+------+



### pyspark.sql.functions

In [154]:
import pyspark.sql.functions as F

df.agg(F.count('id').alias('total_row'), \
       F.countDistinct('id').alias('distinct_row_num')).show()

+---------+----------------+
|total_row|distinct_row_num|
+---------+----------------+
|        6|               5|
+---------+----------------+



In [156]:
df.withColumn('new_id', F.monotonically_increasing_id()).show()
df.withColumn('new_id2', df.id+10).show()

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F| 171798691840|
|  5| 129.2|   5.3| 42|     M| 326417514496|
|  1| 144.5|   5.9| 33|     M| 481036337152|
|  4| 144.5|   5.9| 33|     M| 644245094400|
|  2| 167.2|   5.4| 45|     M| 721554505728|
|  3| 124.1|   5.2| 23|     F|1623497637888|
+---+------+------+---+------+-------------+

+---+------+------+---+------+-------+
| id|weight|height|age|gender|new_id2|
+---+------+------+---+------+-------+
|  5| 133.2|   5.7| 54|     F|     15|
|  5| 129.2|   5.3| 42|     M|     15|
|  1| 144.5|   5.9| 33|     M|     11|
|  4| 144.5|   5.9| 33|     M|     14|
|  2| 167.2|   5.4| 45|     M|     12|
|  3| 124.1|   5.2| 23|     F|     13|
+---+------+------+---+------+-------+



In [160]:
df_miss = sqlContext.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])


In [161]:
df_miss.show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  3|  null|   5.2|null|  null|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+



In [163]:
df_miss.rdd.collect()

[Row(id=1, weight=143.5, height=5.6, age=28, gender='M', income=100000),
 Row(id=2, weight=167.2, height=5.4, age=45, gender='M', income=None),
 Row(id=3, weight=None, height=5.2, age=None, gender=None, income=None),
 Row(id=4, weight=144.5, height=5.9, age=33, gender='M', income=None),
 Row(id=5, weight=133.2, height=5.7, age=54, gender='F', income=None),
 Row(id=6, weight=124.1, height=5.2, age=None, gender='F', income=None),
 Row(id=7, weight=129.2, height=5.3, age=42, gender='M', income=76000)]

In [164]:
df_miss.rdd.map(
    lambda row: (row['id'], sum([c == None for c in row]))
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [168]:
df_miss.rdd.map(
                lambda row: (row['id'], sum([c==None for c in row]))).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [10]:
df_miss.where('id==3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [11]:
import pyspark.sql.functions as fn
df_miss.agg(*[
    (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
    for c in df_miss.columns
]).show()

+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+



In [179]:
df_miss.select('income').count()

7

In [185]:
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c!='income'])
df_miss_no_income.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



### deal with NA and fillna

In [191]:
df_miss_no_income.dropna().show()
df_miss_no_income.dropna(thresh = 3).show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 143.5|   5.6| 28|     M|
|  2| 167.2|   5.4| 45|     M|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  7| 129.2|   5.3| 42|     M|
+---+------+------+---+------+

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [206]:
means = df_miss_no_income.agg(
    *[F.mean(c).alias(c) for c in df_miss_no_income.columns if c!='gender']).toPandas().to_dict('records')[0]
means['gender'] = 'missing'
means

{'age': 40.4,
 'gender': 'missing',
 'height': 5.471428571428571,
 'id': 4.0,
 'weight': 140.28333333333333}

In [204]:
means = df_miss_no_income.agg(
        *[F.mean(c).alias(c) for c in df_miss_no_income.columns if c!='gender'])
means.show()
print(means.toPandas())
means.toPandas().to_dict('records')[0]

+---+------------------+-----------------+----+
| id|            weight|           height| age|
+---+------------------+-----------------+----+
|4.0|140.28333333333333|5.471428571428571|40.4|
+---+------------------+-----------------+----+

    id      weight    height   age
0  4.0  140.283333  5.471429  40.4


{'age': 40.4,
 'height': 5.471428571428571,
 'id': 4.0,
 'weight': 140.28333333333333}

In [207]:
df_miss_no_income.fillna(means).show()

+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+



### outlier -- approxQuantile

In [249]:
df_outliers = sqlContext.createDataFrame([
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])
df_outliers.show()

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  3| 342.3|   5.1| 99|
|  4| 144.5|   5.5| 33|
|  5| 133.2|   5.4| 54|
|  6| 124.1|   5.1| 21|
|  7| 129.2|   5.3| 42|
+---+------+------+---+



In [250]:
cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
    quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
    print(quantiles)
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

[129.2, 154.2]
[5.1, 5.5]
[28.0, 54.0]


In [251]:
bounds

{'age': [-11.0, 93.0],
 'height': [4.499999999999999, 6.1000000000000005],
 'weight': [91.69999999999999, 191.7]}

In [252]:
df_outliers.select(*['id']+ ['age']).show()

+---+---+
| id|age|
+---+---+
|  1| 28|
|  2| 45|
|  3| 99|
|  4| 33|
|  5| 54|
|  6| 21|
|  7| 42|
+---+---+



In [253]:
outliers = df_outliers.select(*['id'] + [
    (
        (df_outliers[c] < bounds[c][0]) |
        (df_outliers[c] > bounds[c][1])
    ).alias(c + '_o') for c in cols
])
outliers.show()

+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



In [254]:
df_outliers = df_outliers.join(outliers, on='id')
df_outliers.show()
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_o|height_o|age_o|
+---+------+------+---+--------+--------+-----+
|  7| 129.2|   5.3| 42|   false|   false|false|
|  6| 124.1|   5.1| 21|   false|   false|false|
|  5| 133.2|   5.4| 54|   false|   false|false|
|  1| 143.5|   5.3| 28|   false|   false|false|
|  3| 342.3|   5.1| 99|    true|   false| true|
|  2| 154.2|   5.5| 45|   false|   false|false|
|  4| 144.5|   5.5| 33|   false|   false|false|
+---+------+------+---+--------+--------+-----+

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+



In [255]:
df_outliers.filter('age_o').select('id', 'age').show()

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+



In [256]:
df_outliers.filter('weight_o').show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_o|height_o|age_o|
+---+------+------+---+--------+--------+-----+
|  3| 342.3|   5.1| 99|    true|   false| true|
+---+------+------+---+--------+--------+-----+



### understand data

In [257]:
import pyspark.sql.types as typ

In [259]:
fraud = sc.textFile('ccFraud.csv.gz')
# header = fraud.first()

fraud = fraud \
    .filter(lambda row: row != header) \
    .map(lambda row: [int(elem) for elem in row.split(',')])

In [None]:
fields = [
    *[
        typ.StructField(h[1:-1], typ.IntegerType(), True)
        for h in header.split(',')
    ]
]

schema = typ.StructType(fields)

In [None]:
fraud_df = spark.createDataFrame(fraud, schema)

In [None]:
fraud_df.printSchema()
fraud_df.groupby('gender').count().show()

In [None]:
numerical = ['balance', 'numTrans', 'numIntlTrans']
desc = fraud_df.describe(numerical)
desc.show()

In [None]:
check skewness
fraud_df.agg({'balance': 'skewness'}).show()

In [None]:
fraud_df.corr('balance', 'numTrans')

In [None]:
n_numerical = len(numerical)

corr = []

for i in range(0, n_numerical):
    temp = [None] * i
    
    for j in range(i, n_numerical):
        temp.append(fraud_df.corr(numerical[i], numerical[j]))
    corr.append(temp)
    
corr

In [266]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

# import bokeh.charts as chrt
from bokeh.io import output_notebook

output_notebook()

In [None]:
hists = fraud_df.select('balance').rdd.flatMap(lambda row: row).histogram(20)

In [None]:
data = {
    'bins': hists[0][:-1],
    'freq': hists[1]
}

fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)
ax.bar(data['bins'], data['freq'], width=2000)
ax.set_title('Histogram of \'balance\'')

plt.savefig('B05793_05_22.png', dpi=300)

In [None]:
data_driver = {'obs': fraud_df.select('balance').rdd.flatMap(lambda row: row).collect()}


fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)

ax.hist(data_driver['obs'], bins=20)
ax.set_title('Histogram of \'balance\' using .hist()')


plt.savefig('B05793_05_24.png', dpi=300)

In [271]:
from pyspark.sql.functions import col
dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
dataset.show(5)

+---+
|key|
+---+
|  0|
|  1|
|  2|
|  0|
|  1|
+---+
only showing top 5 rows



In [283]:
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2, 2:0.5}, seed=0)
sampled.show(5)
sampled.groupBy("key").count().orderBy("key").show()

+---+
|key|
+---+
|  2|
|  1|
|  1|
|  2|
|  2|
+---+
only showing top 5 rows

+---+-----+
|key|count|
+---+-----+
|  0|    3|
|  1|    7|
|  2|   14|
+---+-----+



## Mllib

In [10]:
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('pyspark_basic')\
                    .getOrCreate()

In [12]:
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]

ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
#
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
print(dp.head())
ds.show()

        A  B    C
0    male  1  NaN
1  female  2  3.0
2    male  3  4.0
+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [34]:
x = list(range(1,3))*2
x.sort()

In [25]:
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
a = np.array(my_list).transpose().tolist()
a

[['male', 'female', 'male'], [1, 2, 3], [None, 3, 4]]

In [21]:
import pandas as pd
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
my_list
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
dp.transpose()

Unnamed: 0,0,1,2
A,male,female,male
B,1,2,3
C,,3,4


In [13]:
dp.fillna(-99)

Unnamed: 0,A,B,C
0,male,1,-99.0
1,female,2,3.0
2,male,3,4.0


In [15]:
ds.fillna(-99).show()

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|-99|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [18]:
# dp.A.replace(['male','female'], [1,0], inplace =True)
dp

Unnamed: 0,A,B,C
0,1,1,
1,0,2,3.0
2,1,3,4.0


In [21]:
ds.na.replace(['male','female'], ['1','0']).show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1|  1|null|
|  0|  2|   3|
|  1|  3|   4|
+---+---+----+



In [29]:
mapping = {'A':'X','B':'y'}

In [30]:
new_names = [mapping.get(col,col) for col in ds.columns]
ds.toDF(*new_names).show(4)

+------+---+----+
|     X|  y|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [35]:
[mapping.get(col,col) for col in ds.columns]

['X', 'y', 'C']

In [33]:
mapping.get('A')

'X'

In [48]:
ds.withColumnRenamed('A', 'W').show()

+------+---+----+
|     W|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [50]:
drop_name = ['A']

In [51]:
ds.drop(*drop_name).show()

+---+----+
|  B|   C|
+---+----+
|  1|null|
|  2|   3|
|  3|   4|
+---+----+



In [54]:
import pyspark.sql.functions as F
ds.withColumn('B_norm', ds.B/ds.groupBy().agg(F.sum("B")).collect()[0][0]).show()

+------+---+----+-------------------+
|     A|  B|   C|             B_norm|
+------+---+----+-------------------+
|  male|  1|null|0.16666666666666666|
|female|  2|   3| 0.3333333333333333|
|  male|  3|   4|                0.5|
+------+---+----+-------------------+



In [6]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [39]:
import pyspark.sql.functions as F
from pyspark.sql.functions import *
latitude = 42
longitude = -70
df1 = df.withColumn('lat', round(latitude + rand(),6))
df1= df1.withColumn('long', longitude-round(rand()*(4-1),6))
# df1.show()
df2 = df1.dropDuplicates(['id'])
df2.show()

+---+------+------+---+------+---------+----------+
| id|weight|height|age|gender|      lat|      long|
+---+------+------+---+------+---------+----------+
|  5| 133.2|   5.7| 54|     F|42.442708|-70.029239|
|  1| 144.5|   5.9| 33|     M|42.464518|-71.919564|
|  3| 124.1|   5.2| 23|     F|42.555046|-71.556378|
|  2| 167.2|   5.4| 45|     M|42.762363| -70.89219|
|  4| 144.5|   5.9| 33|     M|42.100707|-71.721583|
+---+------+------+---+------+---------+----------+



In [40]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import DataFrameWriter as W
from math import radians, cos, sin, asin, sqrt

In [59]:
def get_distance(longit_a, latit_a, longit_b, latit_b):
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
    # Calculate Distance
    distance = central_angle * radius
    return abs(round(distance, 2))

In [60]:
udf_get_distance = F.udf(get_distance)

In [61]:
df3 =  df2.select('id', 'height', 'lat', 'long')
df2_pairs = (df3.crossJoin(df3).toDF(
'id_A', 'height_A', 'lat_A', 'long_A',
'id_B', 'height_A', 'lat_B', 'long_B'))

In [62]:
df3.crossJoin(df3).toDF(
'id_A', 'height_A', 'lat_A', 'long_A',
'id_B', 'height_A', 'lat_B', 'long_B')

DataFrame[id_A: bigint, height_A: double, lat_A: double, long_A: double, id_B: bigint, height_A: double, lat_B: double, long_B: double]

In [63]:
nz_pairs_distance = nz_station_pairs.withColumn(“ABS_DISTANCE”, udf_get_distance(
nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A,
nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B)
).cast(DoubleType())

SyntaxError: invalid character in identifier (<ipython-input-63-a64d1702d441>, line 1)

In [69]:
# import math
math.radians(30)

0.5235987755982988

In [70]:
sqrt(4)

2.0

In [None]:
import random
import sys
import math

42.379999, -72.109233
latitude = 42.03-42.71
longitude = -73.50, -70.66

def generate_random_data(lat, lon, num_rows):
    for _ in range(num_rows):
#         hex1 = '%012x' % random.randrange(16**12) # 12 char random string
#         flt = float(random.randint(0,100))
        dec_lat = random.random()/100
        dec_lon = random.random()/100
#         print('%s %.1f %.6f %.6f \n' % (hex1.lower(), flt, lon+dec_lon, lat+dec_lat))

generate_random_data(latitude, longitude, 5)

In [32]:
df.withColumn('lat', -round(rand()*(4-1),6)).show()

+---+------+------+---+------+---------+
| id|weight|height|age|gender|      lat|
+---+------+------+---+------+---------+
|  1| 144.5|   5.9| 33|     M|-1.099678|
|  2| 167.2|   5.4| 45|     M|-0.111481|
|  3| 124.1|   5.2| 23|     F|-2.410113|
|  4| 144.5|   5.9| 33|     M|-0.583369|
|  5| 133.2|   5.7| 54|     F|-2.711674|
|  3| 124.1|   5.2| 23|     F| -2.31578|
|  5| 129.2|   5.3| 42|     M|-0.527308|
+---+------+------+---+------+---------+



In [57]:
ds.groupBy().agg(F.sum("B")).collect()

[Row(sum(B)=6)]

In [58]:
ds.withColumn('log_B',F.log(ds.B)).show()

+------+---+----+------------------+
|     A|  B|   C|             log_B|
+------+---+----+------------------+
|  male|  1|null|               0.0|
|female|  2|   3|0.6931471805599453|
|  male|  3|   4|1.0986122886681098|
+------+---+----+------------------+



In [59]:
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)

In [60]:
dp

Unnamed: 0,A,B,C
0,a,m,1
1,b,m,2
2,c,n,3
3,d,n,6


In [61]:
ds.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  m|  1|
|  b|  m|  2|
|  c|  n|  3|
|  d|  n|  6|
+---+---+---+



In [63]:
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)


In [64]:
dp

Unnamed: 0,A,B,C,rank
0,a,m,1,2.0
1,b,m,2,1.0
2,c,n,3,2.0
3,d,n,6,1.0


In [66]:
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w))

In [68]:
ds.show()

+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  b|  m|  2|   1|
|  a|  m|  1|   2|
|  d|  n|  6|   1|
|  c|  n|  3|   2|
+---+---+---+----+



In [40]:
import numpy as np
i=[1,2,3]*3
i.sort(reverse = False)
i

[1, 1, 1, 2, 2, 2, 3, 3, 3]

In [41]:
h = 
m = list(range(1,4))*3

m.sort()
m

[1, 1, 1, 2, 2, 2, 3, 3, 3]

In [4]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
     ('Amber', 9)])
data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [5]:
data.take(2)

[('Amber', 22), ('Alfred', 23)]

In [None]:
for i in data.collect():
    print(i)

In [48]:
x = sqlContext.read.csv("/Users/tjmask/Desktop/Semester 2/Data Mining/HW1/hw1/Homework_1/Datasets/eye.csv")

In [26]:
x.describe().show()

+-------+-----+--------------------+-------------------+
|summary|  _c0|                 _c1|                _c2|
+-------+-----+--------------------+-------------------+
|  count|  180|                 181|                181|
|   mean| null|-0.00326944500000...|0.10504874055555553|
| stddev| null| 0.47220709838743213|  0.492723292491621|
|    min|  ORB|          -0.0049875|         -0.0049875|
|    max|PUPIL|                   X|                  Y|
+-------+-----+--------------------+-------------------+



In [50]:
z =  sc.textFile("/Users/tjmask/Desktop/Semester 2/Data Mining/HW1/hw1/Homework_1/Datasets/eye.csv")

In [53]:
z.take(10)

[',X,Y',
 'ORB,-0.78304,0.2793',
 'ORB,-0.75304,0.021',
 'ORB,-0.7581,-0.2394',
 'ORB,-0.75312,0.20449',
 'ORB,-0.72319,-0.28928',
 'ORB,-0.69825,0.23441',
 'ORB,-0.69825,0.2793',
 'ORB,-0.69825,0.34913',
 'ORB,-0.69825,0.42394']

In [22]:
y = sc.textFile('/Users/tjmask/Desktop/Courses/health care/hw3/VTINP16_upd.zip')

In [25]:
y.take(1)

["PK\x03\x04\x14\x00\x00\x00\x08\x00T]JN�N��6�2\x00�V�\x00\x0f\x00\x00\x00VTINP16_upd.TXT���r\x1cI�&x\x1f�y���w���v�B,\x04\x01�\x00�\x04qi)����t���3��������\x1e\x1e��_<3�\x08\x06��fj�~�������\x7f�������_��?��������?�������������\x7f�_ݿ��\x7f���잟/~wWw/�������\x1d����@�G�?���������\x18���\x16\x1f���Ň->m�q��[���o8�=�W?�W?�W?�W?�W?�W?�W?�W?�W?㫟�����g|�3��\x19_���~�W?㫟�����/W߯�X�9�����_�\x7f��\x7f����������������������]^o_��������\x1b�\x1e\x7f��������\x7fw/_���w���_^n�]�v��/���\x7f��\x7f��\x7ft�/_��\x0b~��t����~����?�^����+�\x13��\x7f�����^�~^w�/����U��\x7f�?�ۗ�?������\x1b\x11�;�\x11�&��u�~H��xp����/�E��v�ޛ��f�=\x1ac��\x1e��=I�{��v�%&�1��=8Z��>eZ5��/�B?�x\t��\x15�}�~����~�=�\x18��^�/�7.�'�\x19w���\x1f�)_\x1f�?>�?N�h\x17C�\\�\x1dm�s��-�����\x0c�����\x13\x19\x10ͯ��.����+�t���z�+��"]

In [14]:
from pyspark import findspark 
from pyspark import SparkContext 
# import SparkSession 
findspark.init() 
sc = SparkContext(appName="MyFirstApp") 
spark = SparkSession(sc) 
print("Hello World!") 
sc.close() #closing the spark session

ImportError: cannot import name 'findspark' from 'pyspark' (//anaconda3/lib/python3.7/site-packages/pyspark/__init__.py)

In [15]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(5)

[('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]

In [16]:
rdd3.collect()

[('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]