# Įvairių formatų duomenų failų įkėlimas, konvertavimas ir išsaugojimas

## Python

### SAS `.sas7bdat` formato failų skaitymas panaudojant _Python_ paketą `sas7bdat`

Importuojame _Python_ paketą `sas7bdat`:

In [1]:
import sas7bdat

_SAS_ `.sas7bdat` formato failą nuskaitome ir paverčiame į _Python_ [`pandas`](http://pandas.pydata.org/) [`DataFrame`](http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html) objektą:

In [2]:
sasFilename = 'data/CAB1.sas7bdat'
with sas7bdat.SAS7BDAT(sasFilename) as f:
    pandasDf = f.to_data_frame()

In [3]:
pandasDf

Unnamed: 0,c,x,y
0,1,1.804823,-0.079915
1,1,0.396577,-1.083318
2,1,2.238294,-0.624232
3,1,0.513658,-0.086609
4,1,-0.594179,0.031891
5,1,-0.737799,-0.250139
6,1,0.685005,-0.804158
7,1,-0.744281,-0.795503
8,1,0.340711,-0.300510
9,1,-1.349847,0.432705


`pandas.DataFrame` objekto turinį įrašome į `.csv` formato failą.

In [4]:
csvFilename = sasFilename.replace('.sas7bdat', '.csv')
csvFilename

'data/CAB1.csv'

In [5]:
pandasDf.to_csv(csvFilename)

Išvedame `.csv` failo turinį panaudodami _linux_ komandą `cat`:

In [6]:
! cat data/CAB1.csv

,c,x,y
0,1.0,1.8048229506419473,-0.07991502090275329
1,1.0,0.39657685504793405,-1.0833176549619468
2,1.0,2.2382943650985343,-0.6242322943044329
3,1.0,0.5136577082925808,-0.08660911687902768
4,1.0,-0.5941787333790818,0.031890818063799654
5,1.0,-0.7377985721302719,-0.25013917494627913
6,1.0,0.6850047647381264,-0.8041581316224685
7,1.0,-0.7442810802984545,-0.7955028221064093
8,1.0,0.3407105493184896,-0.3005098035802357
9,1.0,-1.3498465156274604,0.4327048610453688
10,1.0,1.305716242646934,1.4251270103591331
11,1.0,-0.415801019499061,1.6143805422533701
12,1.0,-1.057726424098238,-0.9483326721764074
13,1.0,0.9536476400272708,0.39197894163906866
14,1.0,-0.07614127913511942,1.2205569287348228
15,1.0,-0.6308414189073877,-0.6357582472716945
16,1.0,-0.34000283342507026,-0.07628363677239759
17,1.0,0.9653583786044433,-1.2166983927386992
18,1.0,1.184490002218949,-0.3436939492474335
19,1.0,1.090242787223093,-0.13531306784080646
20,1.0,-1.359501932856284,-2.331337587082326
21,1.0,

Pastebime, kad pirmasis išsaugotas stulpelis neturi pavadinimo. Jo reikšmės yra `pandas.DataFrame` indeksas. Šis indeksas buvo sukurtas duomenų įkėlimo metu ir saugojant duomenis mums nėra reikalingas. 

Parametru `index=None` nurodome, kad nenorime saugoti indekso stulpelio:

In [7]:
pandasDf.to_csv(csvFilename, index=None)

Dar kartą išvedame `.csv` failo turinį panaudodami _linux_ komandą `cat`:

In [8]:
! cat data/CAB1.csv

c,x,y
1.0,1.8048229506419473,-0.07991502090275329
1.0,0.39657685504793405,-1.0833176549619468
1.0,2.2382943650985343,-0.6242322943044329
1.0,0.5136577082925808,-0.08660911687902768
1.0,-0.5941787333790818,0.031890818063799654
1.0,-0.7377985721302719,-0.25013917494627913
1.0,0.6850047647381264,-0.8041581316224685
1.0,-0.7442810802984545,-0.7955028221064093
1.0,0.3407105493184896,-0.3005098035802357
1.0,-1.3498465156274604,0.4327048610453688
1.0,1.305716242646934,1.4251270103591331
1.0,-0.415801019499061,1.6143805422533701
1.0,-1.057726424098238,-0.9483326721764074
1.0,0.9536476400272708,0.39197894163906866
1.0,-0.07614127913511942,1.2205569287348228
1.0,-0.6308414189073877,-0.6357582472716945
1.0,-0.34000283342507026,-0.07628363677239759
1.0,0.9653583786044433,-1.2166983927386992
1.0,1.184490002218949,-0.3436939492474335
1.0,1.090242787223093,-0.13531306784080646
1.0,-1.359501932856284,-2.331337587082326
1.0,-0.40968631040949993,0.6542014011456639
1.0,0.3992570981

Dabar išsaugojome tik duomenis.

**_Pastaba_**: _Jupyter Notebook_ _Code_ rūšies ląstelėje įrašę `!` simbolį galime naudoti _linux_ komandas, pvz:

Išvesti darbinės direktorijos turinį:

In [9]:
! ls -a

.	   examples_datafiles.ipynb	 .gitignore	     README.md
..	   examples_kmeans.ipynb	 hs_err_pid5118.log
data	   examples_kmeans_tmp_ml.ipynb  .ipynb_checkpoints
derby.log  .git				 metastore_db


**_Pastaba:_** _Jupyter Notebook_ darbinė direktorija yra direktorija, kurioje patalpintas dabartinis bloknoto `.ipynb` failas:

In [10]:
! pwd

/home/vagrant/labs/p160m132-examples-spark


Sukurti naują direktoriją `nauja_direktorija` ir išvesti darbinės direktorijos turinį:

In [11]:
! mkdir nauja_direktorija
! ls

data			  examples_kmeans.ipynb		metastore_db
derby.log		  examples_kmeans_tmp_ml.ipynb	nauja_direktorija
examples_datafiles.ipynb  hs_err_pid5118.log		README.md


Ištrinti direktoriją `nauja_direktorija` ir išvesti darbinės direktorijos turinį:

In [12]:
! rm -rf nauja_direktorija
! ls

data	   examples_datafiles.ipynb  examples_kmeans_tmp_ml.ipynb  metastore_db
derby.log  examples_kmeans.ipynb     hs_err_pid5118.log		   README.md


Nadojant simbolį `|` Sukurti komandų seką, kai vienos komandos išvestis tampa kitos komandos įvestimi:

In [13]:
! ls data | grep .xls

CA10.xls
CA1.xls
CA2.xls
CA3.xls
CA4.xls
CA5.xls
CA6.xls
CA7.xls
CA8.xls
CA9.xls


### Excel _.xls_ ir _.xlsx_ formato failų skaitymas

Toliau pateikiama keletas būdų kaip įkelti _Excel_ `.xls` ir `.xlsx` formatų failus ir juos išsaugoti `.csv` formatu.

#### Python `pandas` paketas

Importuojame _Python_ paketą [`pandas`](http://pandas.pydata.org/) ir pakeičiame jo pavadinimą į `pd`:

In [14]:
import pandas as pd

In [15]:
pdDf = pd.read_excel("data/CA1.xls")
pdDf.head()

Unnamed: 0,ID,X,Y,klasteris
0,1,-0.485695,2.075174,1
1,2,-0.368247,2.037708,2
2,3,-1.409078,-1.371631,3
3,4,-1.434896,-1.959709,4
4,5,3.70669,-0.803241,5


Galime išsaugoti `df` turinį į `.csv` formato failą kaip tai darėme su įkeltu `.sas7bdat` failu:

In [16]:
pdDf.to_csv("data/CA1.csv", index=None)

Išvedame pirmas 10 failo `CA1.csv` eilučių:

In [17]:
! head data/CA1.csv

ID,X,Y,klasteris
1,-0.485694686,2.075174209,1
2,-0.3682471,2.037708135,2
3,-1.409078419,-1.371631094,3
4,-1.434896397,-1.959709476,4
5,3.706690333,-0.803240913,5
6,-0.488402521,2.076363672,1
7,-0.365141811,2.053486526,2
8,-1.403284478,-1.387871943,3
9,-1.443300554,-1.940563955,4


#### Python `xlrd` paketas

`xlrd` paketas leidžia dirbti su _Excel_ failais itin žemu abstrakcijos lygmeniu. Jeigu _Excel_ failai yra tvarkingos lentelės, prasidedančios viršutiniame kairiame _Excel_ darbo knygos kampe, jų įkėlimui patogiau naudoti kitus metodus.

Importuojame reikalingus _Python_ paketus ir sukuriame funkciją, kuri nuskaito _Excel_ failą ir jo darbinius lakštus (angl. worksheet) išsaugo `.csv` failų formatu:

In [18]:
import xlrd
import csv
import os

def write_csv_from_excel(excel_filepath):
    dir_path = os.path.dirname(excel_filepath)
    workbook = xlrd.open_workbook(excel_filepath)
    for worksheet_name in workbook.sheet_names():
        worksheet = workbook.sheet_by_name(worksheet_name)
        if not worksheet.nrows:
            continue
        csv_filename = os.path.join(dir_path, worksheet_name) + '.csv'
        with open(csv_filename, 'w') as csvfile:
            csvwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
            for rownum in range(worksheet.nrows):
                csvwriter.writerow(worksheet.row_values(rownum))

In [19]:
write_csv_from_excel("data/CA2.xls")

In [20]:
! ls data

CA10.xls  CA2.csv  CA4.xls  CA7.xls  CAB1.csv
CA1.csv   CA2.xls  CA5.xls  CA8.xls  CAB1.sas7bdat
CA1.xls   CA3.xls  CA6.xls  CA9.xls  Untitled.ipynb


In [21]:
! ls data | grep CA2

CA2.csv
CA2.xls


In [22]:
! head data/CA2.csv

id,X,y,klasteris
1.0,-0.543107008398867,-2.24589337158166,1.0
2.0,-3.05141284857379,-2.79020246961634,1.0
3.0,-2.63406157935159,-0.90642746845988,1.0
4.0,-2.94372086031095,-3.15672525023875,1.0
5.0,-1.59848537397944,-4.507897764527,1.0
6.0,-2.40908579499534,-4.51768121307842,1.0
7.0,-1.71876500225743,-5.25533798241366,1.0
8.0,-1.1705287055537,-2.10686355275734,1.0
9.0,-0.558161109901716,-3.90145324975195,1.0


#### Python `csvkit` paketas ir _Linux_ komandinė eilutė

Jeigu sistemoje yra įdiegtas _Python_ paketas [`csvkit`](http://csvkit.readthedocs.org/en/0.9.1/), galime pasinaudoti komandine eilutės programa `in2csv`:

In [23]:
! in2csv data/CA1.xls > data/CA1_csvkit.csv

In [24]:
! head data/CA1_csvkit.csv

ID,X,Y,klasteris
1,-0.485694686,2.075174209,1
2,-0.3682471,2.037708135,2
3,-1.409078419,-1.371631094,3
4,-1.434896397,-1.959709476,4
5,3.706690333,-0.803240913,5
6,-0.488402521,2.076363672,1
7,-0.365141811,2.053486526,2
8,-1.403284478,-1.387871943,3
9,-1.443300554,-1.940563955,4


Detali programos `in2csv` naudojimo dokumentacija:

In [25]:
! in2csv -h

usage: in2csv [-h] [-d DELIMITER] [-t] [-q QUOTECHAR] [-u {0,1,2,3}] [-b]
              [-p ESCAPECHAR] [-z MAXFIELDSIZE] [-e ENCODING] [-S] [-H] [-v]
              [-l] [--zero] [-f FILETYPE] [-s SCHEMA] [-k KEY] [-y SNIFFLIMIT]
              [--sheet SHEET] [--no-inference]
              [FILE]

Convert common, but less awesome, tabular data formats to CSV.

positional arguments:
  FILE                  The CSV file to operate on. If omitted, will accept
                        input on STDIN.

optional arguments:
  -h, --help            show this help message and exit
  -d DELIMITER, --delimiter DELIMITER
                        Delimiting character of the input CSV file.
  -t, --tabs            Specifies that the input CSV file is delimited with
                        tabs. Overrides "-d".
  -q QUOTECHAR, --quotechar QUOTECHAR
                        Character used to quote strings in the input CSV file.
  -u {0,1,2,3}, --quoting {0,1,2,3}
                    

## pySpark

Duomenys į _Apache Spark_ paprastai įkeliami iš duomenų bazių arba tekstinių failų, esančių lokalioje _Linux_ failų sistemoje arba _Hadoop_ paskirstytoje failų sistemoje (angl. Hadoop Distributed File System - [HDFS](http://hortonworks.com/hadoop/hdfs/)). Studijų modulyje **P160M132** apsiribosime darbu su tekstiniais failais. 

Toliau pateikiama keletas metodų kaip įkelti ir išsaugoti tekstinius failus su _Apache Spark_.

### Tekstiniai failai lokalioje _Linux_ failų sistemoje 

#### `sc.textFile` metodas

In [26]:
csvRDD = sc.textFile("./data/CA1.csv")
print(type(csvRDD))
csvRDD.take(5)

<class 'pyspark.rdd.RDD'>


['ID,X,Y,klasteris',
 '1,-0.485694686,2.075174209,1',
 '2,-0.3682471,2.037708135,2',
 '3,-1.409078419,-1.371631094,3',
 '4,-1.434896397,-1.959709476,4']

Matome, kad `sc.textFile` metodas grąžina tipo `RDD` objektą. Pažiūrime kokio tipo yra `RDD` turinys.

In [27]:
csvRDD.map(type).take(5)

[str, str, str, str, str]

Nieko nuostabaus, kad įkėlę tekstinį failą, turime [`str`](https://docs.python.org/3/library/stdtypes.html#text-sequence-type-str) (tekstinių simbolių sekos) tipo objektų rinkinį. Kadangi failą skaitome naudomi _Apache Spark_, tikėtina, kad norėsime atlikti kažką daugiau, negu išvesti pirmas **N** jo eilučių. Pastarajam veiksmui puikiai tinka _Linux_ komanda `head -n` **N**:

In [28]:
! head -n 5 data/CA1.csv

ID,X,Y,klasteris
1,-0.485694686,2.075174209,1
2,-0.3682471,2.037708135,2
3,-1.409078419,-1.371631094,3
4,-1.434896397,-1.959709476,4


Norėdami atlikti skaičiavimus, turime atitinkamus stulpelius konvertuoti į reikiamo tipo reikšmes, tačiau mūsų pirmoji eilutė yra stupelių pavadinimai. Pirmąją eilutę paimame naudodami `RDD` metodą `first`:

In [29]:
header = csvRDD.first()
header

'ID,X,Y,klasteris'

Atskiriame pirmąją eilutę nuo `RDD`. Nors skamba paprastai, tai nėra visiškai trivialus veiksmas.

In [30]:
import itertools as it

rowsRDD = csvRDD.mapPartitionsWithIndex(lambda idx, gen: it.islice(gen, 1, None) if idx == 0 else gen)

rowsRDD.take(5)

['1,-0.485694686,2.075174209,1',
 '2,-0.3682471,2.037708135,2',
 '3,-1.409078419,-1.371631094,3',
 '4,-1.434896397,-1.959709476,4',
 '5,3.706690333,-0.803240913,5']

Prieš aptardami ką tik naudotą anoniminę funkciją, panagrinėkime kokių tipų elementai pasiekiami šiai funkcijai:

In [31]:
csvRDD.mapPartitionsWithIndex(lambda elem1, elem2: (str(type(elem1)), str(type(elem2)))).take(5)

["<class 'int'>", "<class 'generator'>"]

`mapPartitionsWithIndex` metodo argumentas yra funkcija, kuriai pateikiami 2 argumentai: `RDD` particijos indeksas ir particijos [`generator`](https://docs.python.org/3/library/stdtypes.html#generator-types) tipo objektas. _Python_ generatoriai nesaugo visų savo elementų atmintyje iš karto, tačiau juos generuoja po vieną, tik tada kai jų paprašoma.

Todėl panaudojame _Python_ standartinės bibliotekos modulį `itertools`. Panaudodami [`itertools.islice`](https://docs.python.org/3/library/itertools.html?highlight=itertools%20islice#itertools.islice) klasę pirmosios particijos generatorių pakeičiame generatoriumi be pirmojo elemento.

**_Pastaba_**: Atminkime, kad _Python_ indeksai prasideda nuo 0.

Kadangi turime stulpelių pavadinimus ir duomenų eilutes atskirai, pastarąsias galime konvertuoti į skaitines reikšmes. Kad būtų aiškiau, po kiekvieno žingsnio pateiksime `RDD` elementų tipus.

In [32]:
splitRowsRDD = rowsRDD.map(lambda line: line.split(","))
splitRowsRDD.take(5)

[['1', '-0.485694686', '2.075174209', '1'],
 ['2', '-0.3682471', '2.037708135', '2'],
 ['3', '-1.409078419', '-1.371631094', '3'],
 ['4', '-1.434896397', '-1.959709476', '4'],
 ['5', '3.706690333', '-0.803240913', '5']]

`splitRowsRDD` elementų tipas:

In [33]:
splitRowsRDD.map(type).take(5)

[list, list, list, list, list]

`splitRowsRDD` elementų tipas yra _Python_ `list` (sąrašas). Kokie yra šių sąrašų tipai?

In [34]:
splitRowsRDD.map(lambda list_: list(map(type, list_))).take(5)

[[str, str, str, str],
 [str, str, str, str],
 [str, str, str, str],
 [str, str, str, str],
 [str, str, str, str]]

Matome, kad `splitRowsRDD` sudaro sąrašai su `str` tipo elementais.

Prieš atliekant skaičiavimus `str` tipo elementus reikia pavesti skaitinių tipų kintamaisiais:

In [35]:
typedRowsRDD = splitRowsRDD.map(lambda vals: [int(vals[0]), float(vals[1]), float(vals[2]), int(vals[3])])
typedRowsRDD.take(5)

[[1, -0.485694686, 2.075174209, 1],
 [2, -0.3682471, 2.037708135, 2],
 [3, -1.409078419, -1.371631094, 3],
 [4, -1.434896397, -1.959709476, 4],
 [5, 3.706690333, -0.803240913, 5]]

Ar pavyko?

In [36]:
typedRowsRDD.map(lambda list_: [type(element) for element in list_]).take(5)

[[int, float, float, int],
 [int, float, float, int],
 [int, float, float, int],
 [int, float, float, int],
 [int, float, float, int]]

Panašu, kad payko.

_Apache Spark_ turi ir aukštesnio lygio abstrakciją - [`DataFrame`](http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes). Prieš turimą `RDD` paverčiant į `DataFrame` tipo objektą, `RDD` sąrašus pakeisime _pySpark_ `Row` tipo objektais.

Eilutę su stulpelių pavadinimais atskyrėme anksčiau. Eilutė:

In [37]:
header

'ID,X,Y,klasteris'

Stulpelių pavadnimų sąrašas:

In [38]:
headerColumns = header.split(",")
headerColumns

['ID', 'X', 'Y', 'klasteris']

Pagal turimus stulpelius sukuriame savo duomenims pritaikytą `Row`.

In [39]:
import pyspark.sql

In [40]:
ca1Row = pyspark.sql.Row("ID", "X", "Y", "klasteris")
ca1Row

<Row(ID, X, Y, klasteris)>

arba

In [41]:
ca1Row = pyspark.sql.Row(headerColumns[0], headerColumns[1], headerColumns[2], headerColumns[3])
ca1Row

<Row(ID, X, Y, klasteris)>

arba

In [42]:
ca1Row = pyspark.sql.Row(*headerColumns)
ca1Row

<Row(ID, X, Y, klasteris)>

Sudarome `RDD` su `Row` tipo objektais:

In [43]:
ca1RowsRDD = typedRowsRDD.map(lambda values: ca1Row(values[0], values[1], values[2], values[3]))
ca1RowsRDD.take(5)

[Row(ID=1, X=-0.485694686, Y=2.075174209, klasteris=1),
 Row(ID=2, X=-0.3682471, Y=2.037708135, klasteris=2),
 Row(ID=3, X=-1.409078419, Y=-1.371631094, klasteris=3),
 Row(ID=4, X=-1.434896397, Y=-1.959709476, klasteris=4),
 Row(ID=5, X=3.706690333, Y=-0.803240913, klasteris=5)]

arba

In [44]:
ca1RowsRDD = typedRowsRDD.map(lambda values: ca1Row(*values))
ca1RowsRDD.take(5)

[Row(ID=1, X=-0.485694686, Y=2.075174209, klasteris=1),
 Row(ID=2, X=-0.3682471, Y=2.037708135, klasteris=2),
 Row(ID=3, X=-1.409078419, Y=-1.371631094, klasteris=3),
 Row(ID=4, X=-1.434896397, Y=-1.959709476, klasteris=4),
 Row(ID=5, X=3.706690333, Y=-0.803240913, klasteris=5)]

Ar tikrai pavyko?

In [45]:
ca1RowsRDD.map(type).take(5)

[pyspark.sql.types.Row,
 pyspark.sql.types.Row,
 pyspark.sql.types.Row,
 pyspark.sql.types.Row,
 pyspark.sql.types.Row]

Dabar galima `RDD` paversti į `DataFrame`:

In [46]:
ca1DF = ca1RowsRDD.toDF()
ca1DF

DataFrame[ID: bigint, X: double, Y: double, klasteris: bigint]

In [47]:
ca1DF.show(5)

+---+------------+------------+---------+
| ID|           X|           Y|klasteris|
+---+------------+------------+---------+
|  1|-0.485694686| 2.075174209|        1|
|  2|  -0.3682471| 2.037708135|        2|
|  3|-1.409078419|-1.371631094|        3|
|  4|-1.434896397|-1.959709476|        4|
|  5| 3.706690333|-0.803240913|        5|
+---+------------+------------+---------+
only showing top 5 rows



Puiku! `DataFrame` objektai bus naudojami su _Apache Spark_ _Machine Learning_ metodais.

Reikėjo atlikti nemažai darbo, kol iš tekstinio failo gavome `DataFrame` objektą. Galbūt šį procesą galime automatizuoti? Atliktus veiksmus sudedame į _Python_ funkciją:

In [48]:
from pyspark.sql import Row
from itertools import islice

def parseCsvToDf(csvFilePath):
    rawRdd = sc.textFile(csvFilePath)
    header = rawRdd.first().split(",")
    DataRow = Row(*[h.lower() for h in header])
    rowsRdd = (
        rawRdd
        .mapPartitionsWithIndex(lambda idx, gen: islice(gen, 1, None) if idx == 0 else gen)
        .map(lambda line: DataRow(*map(float, line.split(","))))
    )
    rowDf = rowsRdd.toDF()
    return rowDf

Pasinaudojame ką tik aprašyta funkcija:

In [49]:
ca1ParsedDF = parseCsvToDf("data/CA1.csv")
ca1ParsedDF

DataFrame[id: double, x: double, y: double, klasteris: double]

Aprašydami funkciją padarėme prielaidą, kad visos `.csv` failo duomenų eilučių reikšmės yra skaitinės. Ar veiks ši funkcija, jeigu bent vienas stulpelis turės tekstines reikšmes? Pamėgikime. Įrašome testinį failą ir jį nuskaitome:

In [50]:
test_filename = "data/csv_file_with_strings.csv"
with open(test_filename, "w") as f:
    f.writelines(["stulpelis_a,stulpelis_b\n", "simbolinė_reikšmė_1,10\n", "simbolinė_reikšmė_2,20\n"])

Patikrininame įrašyto failo turinį:

In [51]:
! cat data/csv_file_with_strings.csv

stulpelis_a,stulpelis_b
simbolinė_reikšmė_1,10
simbolinė_reikšmė_2,20


Bandome įkelti failą naudodami savo aprašytą funkciją (įvyks klaida):

In [52]:
testDF = parseCsvToDf(test_filename)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/srv/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/srv/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/srv/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/srv/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-48-2d1c5b82f951>", line 11, in <lambda>
ValueError: could not convert string to float: 'simbolinė_reikšmė_1'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/srv/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/srv/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/srv/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/srv/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-48-2d1c5b82f951>", line 11, in <lambda>
ValueError: could not convert string to float: 'simbolinė_reikšmė_1'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


Klaidos pranešime matome eilutę:

`ValueError: could not convert string to float: 'simbolinė_reikšmė_1'`

Ši klaida reikškia, kad simbolių eilutės tipo (angl. string) reikšmės  `simbolinė_reikšmė_1` nepavyko paveiksti į skaitinio tipo `float` reikšmę.

Norėdami aprašyti funkciją, kuri patikrintų galimus variantus ir stulpeliams parinktų teisingus tipus, turėtume daryti panašiai kaip daroma [čia](https://github.com/seahboonsiew/pyspark-csv/blob/master/pyspark_csv.py). Tada turėtume universalią funkciją tekstinio `RDD` pavertinmui į `DataFrame`. Tačiau atminkime, kad _Python_ yra interpretuojama programavimo kalba, todėl gerokai lėtesnė už kompiliuojamą [_Scala_](http://www.scala-lang.org/) programavimo kalbą, kuria ir parašytas _Apache Spark_. Žemiau pateiktas šių kalbų spartos palyginimas darbui su _Apache Spark_ paimtas iš [šio šaltinio](https://databricks.com/blog/2015/04/24/recent-performance-improvements-in-apache-spark-sql-python-dataframes-and-more.html).

<img src="https://databricks.com/wp-content/uploads/2015/02/Screen-Shot-2015-02-16-at-9.46.39-AM.png">

Matome, kad naudojant `DataFrame` objektų metodus skirtumo tarp kalbų nėra, tačiau jis labai ryškus `RDD` objektų metodams. Žemiau pateikiame _Scala_ kalba parašytą _Apache Spark_ paketą, kurį galima naudoti iš _Python_ kalbos _Apache Spark_ sąsajos.

#### [spark-csv](https://github.com/databricks/spark-csv) paketas iš http://spark-packages.org/

##### `.csv` failo įkėlimas

In [53]:
ca1EasyDF = sqlContext.read.format("com.databricks.spark.csv").options(header=True, inferSchema=True).load("data/CA1.csv")
ca1EasyDF

DataFrame[ID: int, X: double, Y: double, klasteris: int]

In [54]:
ca1EasyDF.show(5)

+---+------------+------------+---------+
| ID|           X|           Y|klasteris|
+---+------------+------------+---------+
|  1|-0.485694686| 2.075174209|        1|
|  2|  -0.3682471| 2.037708135|        2|
|  3|-1.409078419|-1.371631094|        3|
|  4|-1.434896397|-1.959709476|        4|
|  5| 3.706690333|-0.803240913|        5|
+---+------------+------------+---------+
only showing top 5 rows



arba

In [55]:
ca1EasyDF = sqlContext.read.load("data/CA1.csv", format="com.databricks.spark.csv", header=True, inferSchema=True)
ca1EasyDF

DataFrame[ID: int, X: double, Y: double, klasteris: int]

In [56]:
ca1EasyDF.show(5)

+---+------------+------------+---------+
| ID|           X|           Y|klasteris|
+---+------------+------------+---------+
|  1|-0.485694686| 2.075174209|        1|
|  2|  -0.3682471| 2.037708135|        2|
|  3|-1.409078419|-1.371631094|        3|
|  4|-1.434896397|-1.959709476|        4|
|  5| 3.706690333|-0.803240913|        5|
+---+------------+------------+---------+
only showing top 5 rows



Gavome ir stulpelių pavadinimus, ir jų tipus. Patikriname su testiniu failu:

In [57]:
testEasyDF = sqlContext.read.load(
    "data/csv_file_with_strings.csv", 
    format="com.databricks.spark.csv", 
    header=True, inferSchema=True)
testEasyDF

DataFrame[stulpelis_a: string, stulpelis_b: int]

In [58]:
testEasyDF.show()

+-------------------+-----------+
|        stulpelis_a|stulpelis_b|
+-------------------+-----------+
|simbolinė_reikšmė_1|         10|
|simbolinė_reikšmė_2|         20|
+-------------------+-----------+



Puiku!

Tikrai rekomenduotina `.csv` failų įkėlimui naudoti pastarąjį metodą :) Parinkčių paaiškinimai pateikti [čia](https://github.com/databricks/spark-csv#features).

##### `DataFrame` išsaugojimas `.csv` formatu

In [59]:
testEasyDF.write.format("com.databricks.spark.csv").save("data/csv_test_write")

**_Pastaba:_** _Apache Spark_ duomenis saugoja į direktoriją atskirais failais. Jeigu duomenų išsaugojimas atliktas sėktingai, direktorijoje sukuriamas tuščias failas `_SUCCESS`. Saugojimas negalimas, jeigu egzistuoja direktorija tokiu pačiu pavadinimu ir įvyks klaida 

`java.lang.RuntimeException: path data/csv_test_write already exists.`

Išsaugoti galima tik duomenų eilutes (be stulpelių pavadinimų eilutės).

In [60]:
# java.lang.RuntimeException: path data/csv_test_write already exists.
testEasyDF.write.format("com.databricks.spark.csv").save("data/csv_test_write")

Py4JJavaError: An error occurred while calling o233.save.
: java.lang.RuntimeException: path data/csv_test_write already exists.
	at scala.sys.package$.error(package.scala:27)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


Žemiau pateikiama keletas komandų rezultatų peržiūrai.

In [61]:
# išvesti direktorijos turinį
! ls data/csv_test_write/

part-00000  _SUCCESS


In [62]:
#išvesti pirmasias 2 failo eilutes
! head -n 2 data/csv_test_write/part-00000

simbolinė_reikšmė_1,10
simbolinė_reikšmė_2,20


In [63]:
#išvesti paskutinę failo eilutę
! tail -n 1 data/csv_test_write/part-00000

simbolinė_reikšmė_2,20


In [64]:
# failas _SUCCESS tikrai tuščias
! cat data/csv_test_write/_SUCCESS

In [65]:
# išvesti sujuntą visų direktorijos failų turinį
! cat data/csv_test_write/*

simbolinė_reikšmė_1,10
simbolinė_reikšmė_2,20


In [66]:
# išvesti kiekvieno direktorijos failo pirmąją eilutę
! head -n 1 data/csv_test_write/*

==> data/csv_test_write/_SUCCESS <==

==> data/csv_test_write/part-00000 <==
simbolinė_reikšmė_1,10
