In [6]:
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
plaintext_rdd = sc.textFile('datos/caba_para_mapa.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
data = dataframe.rdd

In [26]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:97% !important; }</style>"))

In [43]:
dataframe
#dataframe.show()

DataFrame[created_on: timestamp, property_type: string, place_name: string, state_name: string, lat-lon: string, lat: double, lon: double, price: int, currency: string, price_aprox_local_currency: double, price_aprox_usd: int, surface_total_in_m2: int, surface_covered_in_m2: int, price_usd_per_m2: int, price_per_m2: double, floor: int, rooms: int, expenses: int, properati_url: string, description: string, title: string, dist_a_subte: double, dist_a_tren: double, dist_a_univ: double, dist_a_villa: double, dist_a_zona_anegada: double]

# Bucketizer

In [45]:
# transforma datos continuos en discreto mediante buckets, los buckets los definis vos
from pyspark.ml.feature import Bucketizer
splits = []
for x in range(0, 10):
    splits.append(x*2)

bucketizer = Bucketizer(splits=splits, inputCol="dist_a_subte", outputCol="bucketsDeSubtes")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataframe)

# print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.select('bucketsDeSubtes').show()

+---------------+
|bucketsDeSubtes|
+---------------+
|            1.0|
|            3.0|
|            3.0|
|            2.0|
|            4.0|
|            0.0|
|            2.0|
|            1.0|
|            1.0|
|            1.0|
|            4.0|
|            3.0|
|            2.0|
|            2.0|
|            2.0|
|            2.0|
|            2.0|
|            2.0|
|            2.0|
|            4.0|
+---------------+
only showing top 20 rows



# QuantileDiscretizer

In [28]:
# transforma datos continuos en discreto mediante buckets, le indicas la cantidad de buckets que queres, supongo que 
# primero revisa cual es el mayor
# no es exacto porque usa aproximaciones (approxQuantile), para eso esta relativeError
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=10, inputCol="dist_a_subte", outputCol="bucketsDeSubtes")

result = discretizer.fit(dataframe).transform(dataframe)
result.select('bucketsDeSubtes').show()

+---------------+
|bucketsDeSubtes|
+---------------+
|            3.0|
|            8.0|
|            8.0|
|            6.0|
|            9.0|
|            1.0|
|            5.0|
|            4.0|
|            4.0|
|            4.0|
|            9.0|
|            7.0|
|            5.0|
|            5.0|
|            5.0|
|            5.0|
|            5.0|
|            5.0|
|            5.0|
|            9.0|
+---------------+
only showing top 20 rows



# MaxAbsScale

In [29]:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+



# MinMaxScaler

In [30]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Features scaled to range: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



# Tokenizer, RegexTokenizer

In [37]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,425-351")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)


+--------------------------------------+------------------------------------------+------+
|sentence                              |words                                     |tokens|
+--------------------------------------+------------------------------------------+------+
|Hi I heard about Spark                |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes    |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,425-351|[logistic,regression,models,are,425-351]  |1     |
+--------------------------------------+------------------------------------------+------+

+--------------------------------------+---------------------------------------------+------+
|sentence                              |words                                        |tokens|
+--------------------------------------+---------------------------------------------+------+
|Hi I heard about Spark                |[hi, i, heard, about, spark]            

# NGram

In [38]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [39]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[0,5,9,17],[0...|
|  0.0|(20,[2,7,9,13,15]...|
|  1.0|(20,[4,6,13,15,18...|
+-----+--------------------+



# LSH

In [42]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show(truncate=False)

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show(truncate=False)

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show(truncate=False)

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+-----------------------+
|id |features   |hashes                 |
+---+-----------+-----------------------+
|0  |[1.0,1.0]  |[[0.0], [0.0], [-1.0]] |
|1  |[1.0,-1.0] |[[-1.0], [0.0], [0.0]] |
|2  |[-1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|
|3  |[-1.0,1.0] |[[0.0], [-1.0], [-1.0]]|
+---+-----------+-----------------------+

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|0  |6  |1.0              |
|2  |5  |1.0              |
|2  |7  |1.0              |
|1  |4  |1.0              |
|1  |7  |1.0              |
|0  |4  |1.0              |
|3  |6  |1.0              |
|3  |5  |1.0              |
+---+---+-----------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+----------------------+-------+
|id |features  |hashes                |distCol|
+---+----------+--