In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=526152c795c1a7a58438e2a2a2b9f336d9c4c9074d46efaf8dbc55013af9989c
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from pyspark.sql import SparkSession
from operator import add

In [13]:
import math

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [4]:
sc = spark.sparkContext

In [5]:
data = [54.4, 25, 15, 45.34, 12, 45, 78]
data_rdd = sc.parallelize(data)
print(type(data_rdd))
data_rdd.collect()

<class 'pyspark.rdd.RDD'>


[54.4, 25, 15, 45.34, 12, 45, 78]

# 1) Escalonamiento

$x_{sc} = \frac{x - \min(x)}{max(x) - min(x)}$

```

```

In [6]:
min = data_rdd.min()
max = data_rdd.max()
minmax_scale = (data_rdd
                .map(lambda x: (x - min)/(max - min))
                )
# minmax_scale
print(minmax_scale.collect())

[0.6424242424242425, 0.19696969696969696, 0.045454545454545456, 0.5051515151515152, 0.0, 0.5, 1.0]


#2) Estandarizacion

In [11]:
media  = data_rdd.mean()
std    = data_rdd.stdev()
standardization= data_rdd.map(lambda xi : (xi-media)/std)
print(standardization.collect())

[0.69416017146692, -0.6527959221394553, -1.1109442532980864, 0.27907778343720047, -1.2483887526456756, 0.26350074017780684, 1.7753902330012894]


#3) Normalizacion

In [15]:
map1=data_rdd.map(lambda xi:xi*xi)
total=map1.sum()
val=math.sqrt(total)
normalized= data_rdd.map(lambda xi :(xi/val))
print(normalized.collect())

[0.45783727509990346, 0.2104031595128233, 0.12624189570769398, 0.38158717009245635, 0.10099351656615517, 0.3787256871230819, 0.6564578576800086]


#5) Binarización

In [48]:
data = ['male', 'female', 'male', 'male', 'female', 'male', 'male']
data_rdd = sc.parallelize(data)
print(type(data_rdd))
print(data_rdd.collect())

kay_value = data_rdd.distinct().zipWithIndex()
binarized = data_rdd.map(lambda x: key_value[x])

print(binarized.collect())

<class 'pyspark.rdd.RDD'>
['male', 'female', 'male', 'male', 'female', 'male', 'male']
[1, 0, 1, 1, 0, 1, 1]


# Term Frequency (TF)

In [7]:
document1 = 'la oración del documento dos es verdadera'
document2 = 'la oración del documento uno es falsa'
document3 = 'la oración del documento uno y la oración del documento dos son verdaderas'
data=[(1, document1),(2, document2),(3, document3)]
lines=sc.parallelize(data)
lines.collect()

[(1, 'la oración del documento dos es verdadera'),
 (2, 'la oración del documento uno es falsa'),
 (3,
  'la oración del documento uno y la oración del documento dos son verdaderas')]

In [8]:
map1=lines.flatMap(lambda x: [((x[0],i),1) for i in x[1].split()])
reduce=map1.reduceByKey(lambda x,y:x+y)
# reduce.collect()
tf=reduce.map(lambda x: (x[0][1],(x[0][0],x[1])))
tf.collect()

[('la', (1, 1)),
 ('documento', (1, 1)),
 ('dos', (1, 1)),
 ('es', (1, 1)),
 ('verdadera', (1, 1)),
 ('oración', (2, 1)),
 ('del', (2, 1)),
 ('la', (3, 2)),
 ('documento', (3, 2)),
 ('uno', (3, 1)),
 ('dos', (3, 1)),
 ('verdaderas', (3, 1)),
 ('oración', (1, 1)),
 ('del', (1, 1)),
 ('la', (2, 1)),
 ('documento', (2, 1)),
 ('uno', (2, 1)),
 ('es', (2, 1)),
 ('falsa', (2, 1)),
 ('oración', (3, 2)),
 ('del', (3, 2)),
 ('y', (3, 1)),
 ('son', (3, 1))]