# Clean and Save Cleaned Data

In [1]:
import pyspark
from pyspark.sql.types import * 

In [2]:
spark = SparkSession(sc)

In [7]:
filepath = "s3a://final-kmeans/raw/lat_longs.txt"
# filepath = "/home/adrian/6007_Final/data/lat_longs.txt"

In [8]:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filepath) # , use_unicode=False
lines.cache()

In [25]:
# Strip white space from beginning and end of lines
stripped = lines.map(lambda l: l.strip())
# Split by spaces
split = stripped.map(lambda s: s.split(' '))
# latlong = split.map(lambda items: (float(items[0]),float(items[1]),items[2])) # ,items[2]
latlong = split.map(lambda items: (items[0],items[1],items[2])) # ,items[2]

In [26]:
stripped.take(2)

[u'36.7 3.216666666666667 <http://dbpedia.org/resource/Algeria>',
 u'42.5 1.5166666666666666 <http://dbpedia.org/resource/Andorra>']

In [28]:
# View an entry
split.take(2)

[[u'36.7', u'3.216666666666667', u'<http://dbpedia.org/resource/Algeria>'],
 [u'42.5', u'1.5166666666666666', u'<http://dbpedia.org/resource/Andorra>']]

In [29]:
# View an entry
latlong.take(2)

[(u'36.7', u'3.216666666666667', u'<http://dbpedia.org/resource/Algeria>'),
 (u'42.5', u'1.5166666666666666', u'<http://dbpedia.org/resource/Andorra>')]

In [30]:
# Create a dataframe and persist to prevent calling to the S3 bucket for every operation
df_pedia = spark.createDataFrame(latlong)
df_pedia.persist()

DataFrame[_1: string, _2: string, _3: string]

In [31]:
# View dataframe
df_pedia.show(20)

+------------------+------------------+--------------------+
|                _1|                _2|                  _3|
+------------------+------------------+--------------------+
|              36.7| 3.216666666666667|<http://dbpedia.o...|
|              42.5|1.5166666666666666|<http://dbpedia.o...|
|12.516666666666667|-70.03333333333333|<http://dbpedia.o...|
|-8.833333333333334|13.333333333333334|<http://dbpedia.o...|
|41.333333333333336|              19.8|<http://dbpedia.o...|
| 34.53333333333333| 69.13333333333334|<http://dbpedia.o...|
|40.416666666666664|49.833333333333336|<http://dbpedia.o...|
| 39.93333333333333| 32.86666666666667|<http://dbpedia.o...|
| 52.36666666666667|               4.9|<http://dbpedia.o...|
|             50.46|              2.13|<http://dbpedia.o...|
|17.116666666666667|            -61.85|<http://dbpedia.o...|
| 57.04638888888889| 9.919166666666667|<http://dbpedia.o...|
|             56.15|10.216666666666667|<http://dbpedia.o...|
|            34.929|    

In [32]:
# Select relevant columns and impose appropriate names and datatypes
from pyspark.sql.functions import col

df_pedia = df_pedia.selectExpr('cast(_1 as float) as latitude', 'cast(_2 as float) as longitude',
                          '_3 as name_of_page')

In [33]:
# View dataframe schema to confirm proper datatypes
df_pedia.schema

StructType(List(StructField(latitude,FloatType,true),StructField(longitude,FloatType,true),StructField(name_of_page,StringType,true)))

In [34]:
# Save dataframe to S3 bucket as a csv file
outpath = 's3a://final-kmeans/clean'
df_pedia.coalesce(1).write.csv(outpath+'/dbpedia.csv')

In [35]:
# Random subset for later steps - Take random 20% of samples as subset
df_pedia.coalesce(1).sample(False, .2).write.csv(outpath+'/dbpedia_sample.csv')