In [142]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Learning_Spark") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.memory", f'{int(2000/4.4)}mb') \
    .config("spark.executor.cores", 2) \
    .getOrCreate()

sc = spark.sparkContext

In [143]:
#считываем файлы
test_data = sc.textFile('test.csv')
train_data = sc.textFile('train.csv')

In [144]:
train_data.take(3)

['battery_power,blue,clock_speed,dual_sim,fc,four_g,int_memory,m_dep,mobile_wt,n_cores,pc,px_height,px_width,ram,sc_h,sc_w,talk_time,three_g,touch_screen,wifi,price_range',
 '842,0,2.2,0,1,0,7,0.6,188,2,2,20,756,2549,9,7,19,0,0,1,1',
 '1021,1,0.5,1,0,1,53,0.7,136,3,6,905,1988,2631,17,3,7,1,1,0,2']

In [145]:
test_data.take(3)

['id,battery_power,blue,clock_speed,dual_sim,fc,four_g,int_memory,m_dep,mobile_wt,n_cores,pc,px_height,px_width,ram,sc_h,sc_w,talk_time,three_g,touch_screen,wifi',
 '1,1043,1,1.8,1,14,0,5,0.1,193,3,16,226,1412,3476,12,7,2,0,1,0',
 '2,841,1,0.5,1,4,1,61,0.8,191,5,12,746,857,3895,6,0,7,1,0,0']

In [146]:
test_header = test_data.first()
train_header = train_data.first()

In [147]:
def good_print(rdd, num):
    for row in rdd.take(num):
        print(row)

In [148]:
test_data = test_data.filter(lambda row: row != test_header)\
           .map(lambda row: [float(col) for col in row.split(',')])
good_print(test_data, 3)

[1.0, 1043.0, 1.0, 1.8, 1.0, 14.0, 0.0, 5.0, 0.1, 193.0, 3.0, 16.0, 226.0, 1412.0, 3476.0, 12.0, 7.0, 2.0, 0.0, 1.0, 0.0]
[2.0, 841.0, 1.0, 0.5, 1.0, 4.0, 1.0, 61.0, 0.8, 191.0, 5.0, 12.0, 746.0, 857.0, 3895.0, 6.0, 0.0, 7.0, 1.0, 0.0, 0.0]
[3.0, 1807.0, 1.0, 2.8, 0.0, 1.0, 0.0, 27.0, 0.9, 186.0, 3.0, 4.0, 1270.0, 1366.0, 2396.0, 17.0, 10.0, 10.0, 0.0, 1.0, 1.0]


In [149]:
train_data = train_data.filter(lambda row: row != train_header)\
           .map(lambda row: [float(col) for col in row.split(',')])
good_print(train_data, 3)

[842.0, 0.0, 2.2, 0.0, 1.0, 0.0, 7.0, 0.6, 188.0, 2.0, 2.0, 20.0, 756.0, 2549.0, 9.0, 7.0, 19.0, 0.0, 0.0, 1.0, 1.0]
[1021.0, 1.0, 0.5, 1.0, 0.0, 1.0, 53.0, 0.7, 136.0, 3.0, 6.0, 905.0, 1988.0, 2631.0, 17.0, 3.0, 7.0, 1.0, 1.0, 0.0, 2.0]
[563.0, 1.0, 0.5, 1.0, 2.0, 1.0, 41.0, 0.9, 145.0, 5.0, 6.0, 1263.0, 1716.0, 2603.0, 11.0, 2.0, 9.0, 1.0, 1.0, 0.0, 2.0]


In [150]:
# формируем список совпадающих колонок
A = set(test_header.split(',')) 
B = set(train_header.split(',')) 
headers_intersect = sorted(list(A & B))

header = train_header.split(',')
train_indexes = [header.index(col_name) for col_name in headers_intersect]

header = test_header.split(',')
test_indexes = [header.index(col_name) for col_name in headers_intersect]

# берем из данных только совпапдающие колонки
test_data = test_data.map(lambda x: [x[idx] for idx in test_indexes])
train_data = train_data.map(lambda x: [x[idx] for idx in train_indexes])

In [151]:
good_print(test_data, 3)

[1043.0, 1.0, 1.8, 1.0, 14.0, 0.0, 5.0, 0.1, 193.0, 3.0, 16.0, 226.0, 1412.0, 3476.0, 12.0, 7.0, 2.0, 0.0, 1.0, 0.0]
[841.0, 1.0, 0.5, 1.0, 4.0, 1.0, 61.0, 0.8, 191.0, 5.0, 12.0, 746.0, 857.0, 3895.0, 6.0, 0.0, 7.0, 1.0, 0.0, 0.0]
[1807.0, 1.0, 2.8, 0.0, 1.0, 0.0, 27.0, 0.9, 186.0, 3.0, 4.0, 1270.0, 1366.0, 2396.0, 17.0, 10.0, 10.0, 0.0, 1.0, 1.0]


In [152]:
good_print(train_data, 3)

[842.0, 0.0, 2.2, 0.0, 1.0, 0.0, 7.0, 0.6, 188.0, 2.0, 2.0, 20.0, 756.0, 2549.0, 9.0, 7.0, 19.0, 0.0, 0.0, 1.0]
[1021.0, 1.0, 0.5, 1.0, 0.0, 1.0, 53.0, 0.7, 136.0, 3.0, 6.0, 905.0, 1988.0, 2631.0, 17.0, 3.0, 7.0, 1.0, 1.0, 0.0]
[563.0, 1.0, 0.5, 1.0, 2.0, 1.0, 41.0, 0.9, 145.0, 5.0, 6.0, 1263.0, 1716.0, 2603.0, 11.0, 2.0, 9.0, 1.0, 1.0, 0.0]


In [153]:
#объединяем train и test 
data = train_data.union(test_data)

In [162]:
#находим распределение соотношения сторон экрана
def screen_ratio(row):
    height, width = row[14], row[15] 
    if height != 0 and width / height >= 16/9: 
        return (round(width / height, 2), 1)
    else:
        return (0, 1)
    
distrib = dict(data.map(lambda row: screen_ratio(row)).countByKey())
distrib = dict(sorted(distrib.items(), key=lambda x:x[0]))
print(f'screen_ratio distribution: {distrib}')

screen_ratio distribution: {0: 3000}


In [161]:
#находим распределение плотности пикселей
def PPI(row):
    height, width = row[14], row[15]
    diag = 0.393701 * (height**2 + width**2)**0.5
    px_height, px_width = row[11], row[12]
    diag_px = (px_height**2 + px_width**2)**0.5
    return (round(diag_px / diag), 1)

distrib = dict(data.map(lambda row: PPI(row)).countByKey())
distrib = dict(sorted(distrib.items(), key=lambda x:x[0]))
print(f'PPI distribution: {distrib}')

PPI distribution: {56: 1, 61: 3, 63: 1, 64: 1, 65: 1, 67: 2, 68: 1, 69: 1, 70: 1, 71: 1, 72: 2, 73: 3, 74: 2, 76: 2, 77: 1, 78: 2, 79: 6, 80: 5, 81: 3, 82: 6, 83: 8, 84: 6, 85: 4, 86: 3, 87: 8, 88: 5, 89: 5, 90: 4, 91: 3, 92: 6, 93: 6, 94: 3, 95: 4, 96: 8, 97: 5, 98: 4, 99: 6, 100: 5, 101: 4, 102: 8, 103: 7, 104: 3, 105: 6, 106: 3, 107: 5, 108: 2, 109: 7, 110: 8, 111: 5, 112: 8, 113: 3, 114: 8, 115: 5, 116: 6, 117: 8, 118: 9, 119: 8, 120: 9, 121: 11, 122: 13, 123: 6, 124: 7, 125: 7, 126: 6, 127: 8, 128: 12, 129: 5, 130: 2, 131: 4, 132: 10, 133: 8, 134: 5, 135: 7, 136: 9, 137: 12, 138: 15, 139: 11, 140: 8, 141: 3, 142: 6, 143: 8, 144: 11, 145: 9, 146: 3, 147: 9, 148: 6, 149: 5, 150: 11, 151: 10, 152: 5, 153: 9, 154: 10, 155: 8, 156: 7, 157: 11, 158: 11, 159: 6, 160: 3, 161: 6, 162: 11, 163: 6, 164: 6, 165: 10, 166: 12, 167: 7, 168: 11, 169: 11, 170: 7, 171: 12, 172: 10, 173: 8, 174: 10, 175: 12, 176: 13, 177: 2, 178: 5, 179: 9, 180: 12, 181: 12, 182: 11, 183: 10, 184: 8, 185: 11, 186: 7

In [187]:
train = spark.read.csv('train.csv', header=True, inferSchema=True)

In [188]:
train.show(1, vertical=True)

-RECORD 0-------------
 battery_power | 842  
 blue          | 0    
 clock_speed   | 2.2  
 dual_sim      | 0    
 fc            | 1    
 four_g        | 0    
 int_memory    | 7    
 m_dep         | 0.6  
 mobile_wt     | 188  
 n_cores       | 2    
 pc            | 2    
 px_height     | 20   
 px_width      | 756  
 ram           | 2549 
 sc_h          | 9    
 sc_w          | 7    
 talk_time     | 19   
 three_g       | 0    
 touch_screen  | 0    
 wifi          | 1    
 price_range   | 1    
only showing top 1 row



In [189]:
test = spark.read.csv('test.csv', header=True, inferSchema=True)

In [190]:
test.show(1, vertical=True)

-RECORD 0-------------
 id            | 1    
 battery_power | 1043 
 blue          | 1    
 clock_speed   | 1.8  
 dual_sim      | 1    
 fc            | 14   
 four_g        | 0    
 int_memory    | 5    
 m_dep         | 0.1  
 mobile_wt     | 193  
 n_cores       | 3    
 pc            | 16   
 px_height     | 226  
 px_width      | 1412 
 ram           | 3476 
 sc_h          | 12   
 sc_w          | 7    
 talk_time     | 2    
 three_g       | 0    
 touch_screen  | 1    
 wifi          | 0    
only showing top 1 row



In [217]:
from pyspark.sql.functions import col, date_trunc

# меняем тип всех столбцов на float
test = test.select(*(col(column).cast("float").alias(column) for column in test.columns))
train = train.select(*(col(column).cast("float").alias(column) for column in train.columns))

train.dtypes

[('battery_power', 'float'),
 ('blue', 'float'),
 ('clock_speed', 'float'),
 ('dual_sim', 'float'),
 ('fc', 'float'),
 ('four_g', 'float'),
 ('int_memory', 'float'),
 ('m_dep', 'float'),
 ('mobile_wt', 'float'),
 ('n_cores', 'float'),
 ('pc', 'float'),
 ('px_height', 'float'),
 ('px_width', 'float'),
 ('ram', 'float'),
 ('sc_h', 'float'),
 ('sc_w', 'float'),
 ('talk_time', 'float'),
 ('three_g', 'float'),
 ('touch_screen', 'float'),
 ('wifi', 'float'),
 ('price_range', 'float')]

In [223]:
# объединяем train и test 
df = train.unionByName(test, allowMissingColumns = True)

# добавляем два новых столбца screen_ratio, PPI
df_save = df.select('talk_time', 'sc_w', 'sc_h', 'px_height', 'px_width') \
              .withColumn('screen_ratio', col('sc_w') / col('sc_h')) \
              .withColumn('PPI',  \
                   (col('px_height')**2 + col('px_width')**2)**0.5 \
                 / (0.393701 * (col('sc_h')**2 + col('sc_w')**2)**0.5))

In [219]:
df_save.show()

+---------+----+----+---------+--------+--------------------+------------------+
|talk_time|sc_w|sc_h|px_height|px_width|        screen_ratio|               PPI|
+---------+----+----+---------+--------+--------------------+------------------+
|     19.0| 7.0| 9.0|     20.0|   756.0|  0.7777777777777778|168.47502240137618|
|      7.0| 3.0|17.0|    905.0|  1988.0| 0.17647058823529413|   321.39398909893|
|      9.0| 2.0|11.0|   1263.0|  1716.0| 0.18181818181818182|  484.058701710938|
|     11.0| 8.0|16.0|   1216.0|  1786.0|                 0.5|306.79264370826786|
|     15.0| 2.0| 8.0|   1208.0|  1212.0|                0.25| 527.0841869616228|
|     10.0| 1.0|17.0|   1004.0|  1654.0|0.058823529411764705|288.59370847632863|
|     18.0| 8.0|13.0|    381.0|  1018.0|  0.6153846153846154|180.87128652921112|
|      5.0| 3.0|16.0|    512.0|  1149.0|              0.1875| 196.2732594235708|
|     20.0| 1.0|17.0|    386.0|   836.0|0.058823529411764705| 137.3424378662623|
|     12.0|10.0|19.0|   1137

In [None]:
df_save.write.options(header='True', delimiter=';') \
             .csv("data.csv")