# Tabele i widoki
Databricks umożliwia pracę na danych w różnej postaci. Możliwe jest pobieranie danych do DataFrames z plików zapisanych w formacie CSV, JSON lub parquet, ale nie są one wtedy dostępne dla wszystkich użytkownikó. By dane były globalnie dostępne, możliwe jest zaimportowanie ich do tabel, dzięki czemu mogą być odczytywane i edytowane w różnych klastrach przez wielu użytkowników.

Poniżej przedstawione jest odczytuwanie danych z tabeli.

In [0]:
# odczyt danych z tabeli creditcard do dataframe
sparkdf = spark.read.table("creditcard")

# wypisanie schematu tabeli
sparkdf.printSchema()

In [0]:
# wypisanie wszystkich danych z dataframe
display(sparkdf)

# SQL
Databricks umożliwia wykorzystywanie zapytań SQL do odczytu i edycji danych przechowywanych w tabelach. Wszystkie operacje wykonywane za pomocą SQL są wykonywane bezpośrednio na tabelach, więc zmieniają dane dla wszystkich użytkowników. By zmodyfikować dane tylko na potrzeby jednego klastra możliwe jest utworzenie widoków tymczasowych, do których przesyłane są wymagane dane.

In [0]:
# wypisanie 5 pierwszych rekordów z tabeli
%sql
select * from creditcard limit(5)

In [0]:
%sql
select * from creditcard order by Time desc limit(5)

In [0]:
# zapisywanie danych w tabeli
%sql
insert into creditcard values(172793, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1,
-0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, -0.1, 5, 0
)

In [0]:
%sql
select * from creditcard order by Time desc limit(5)

#count()
Zlicza ilość wierszy w data frame

In [0]:
# utworzenie dataframe zawierające informacje o ilości transakcji poprawnych i podejrzanych
normal = sparkdf[sparkdf["Class"] == 0]
anomalies = sparkdf[sparkdf["Class"] == 1]

# pobranie liczby transakcji poprawnych i podejrzanych
normalCount = normal.count()
anomaliesCount = anomalies.count()
print("Record Count: {0:,}".format( normalCount ))
print("Record Count: {0:,}".format( anomaliesCount ))

#cache()

Dzięki cacheowaniu danych możliwe jest uzyskanie większej wydajności w Apache Spark poprzez skopiowanie wszystkich wybranych danych do lokalnego egzekutora. Domyślnie dane są pobierane bezpośrednio ze źródła, co może być czasochłonne, a lokalnie przechowywane dane w pamięci cache są dużo szybciej dostępne.

In [0]:
(anomalies
  .cache()         # Mark the DataFrame as cached
  .count()         # Materialize the cache
) 

#show()
Funkcja służąca do wypisywania danych z dataframe w konsoli, w przypadku języka python, zawiera ona dwa opcjonalne parametry, ilość wypisywanych wierszy oraz wartość boolean informującą czy dane mają być zawijane. W języku scala ta funkcja posiada dodatkowe metody przeciążenia.

In [0]:
sparkdf.show()

#display()
Jest to funkcja notebooka, która wypisuje dane w postaci tabeli. Mogą one zostać także wyświetlone w postaci różnych diagramów wybierając przykładowy z listy pod domyślnie wypisywaną tabelą. Parametry diagramów są edytowalne co umożliwa dopasowywanie danych do potrzeb. Podgląd wyników można pobrać na lokalny komputer w postaci pliku csv.

In [0]:
display(sparkdf)

#limit()
Z wybranego dataframe tworzony jest nowy, poprzez podanie w parametrze n wierszy, ile ma zostać skopiowanych do nowego dataframe.

In [0]:
limitedDF = sparkdf.limit(5)

display(limitedDF)

#select()
Funkcja tworzy nowy dataframe pobierając tylko wybrae kolumny.

In [0]:
onlyThreeDF = (sparkdf
  .select("Time", "Amount", "Class")
)
onlyThreeDF.printSchema()

In [0]:
display(onlyThreeDF)

#drop()
Tworzone jest nowy dataframe poprzez usunięcie wybranych kolumn.

In [0]:
onlyThreeDF = (sparkdf
  .drop("Time")
)
display(onlyThreeDF)

# distinct()
Funkcja wyszukuje i wybiera unikalne wartości wybranej kolumny oraz tworzy z nich nowy dataframe.

In [0]:
distinctDF = (sparkdf
  .select("Class")
  .distinct()
)

display(distinctDF)

# dropDuplicates()
Działa na podobnej zadadzie co funkcja distinct, ale umożliwia wybieranie wielu kolumn do wybierania unikalnych wierszy

In [0]:
distinctDF = (sparkdf
  .select("Class", "Amount")
  .distinct()
)

display(distinctDF)

# Funkcje tworzone przez użytkownika
Databricks umożliwia tworzenie własnych funkcji, jeśli istniejace nie mają wszystkich potrzebnych funkcjonalności.

Poniżej przedstawiona jest funkcja getColumn wybierająca kolumnę z zadanego zbioru danych.

In [0]:
def getColumn(df, name):
  column = (df
  .select(name)
  )
  return column

result = getColumn(sparkdf,"Amount")
display(result)

# Diagramy i analiza zbioru danych
Databricks umożliwa tworzenie diagramów z wypisanych danych. Wystarczy wybrać pod tabelą rodzaj diagramu i dostosować jego parametry.

Możliwe jest także wykorzystanie bibliotek obsługiwanych języków do tworzenia różnego rodzaju diagramów.

Poniższy diagram wykorzystuje biblioteki języka python do graficznego przedstawiania stosunku poprawnych transakcji do anomalii w tabeli creditcard.

In [0]:
# utworzenie dataframe zawierające informacje o ilości transakcji poprawnych i podejrzanych
normal = sparkdf[sparkdf["Class"] == 0]
anomalies = sparkdf[sparkdf["Class"] == 1]

# pobranie liczby transakcji poprawnych i podejrzanych
normalCount = normal.count()
anomaliesCount = anomalies.count()

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pandasdf = sparkdf.toPandas()

class_counts = pd.value_counts(pandasdf['Class'], sort = True)
class_counts.plot(kind = 'bar', rot=0)
plt.title("Class Distribution")
plt.xticks(range(2), ["Normal", "Anomaly"])
plt.xlabel("Label")
plt.ylabel("Counts")

In [0]:
def plot_histogram(df, bins, column, log_scale=False):
  bins = 100
  anomalies = df[df.Class == 1]
  normal = df[df.Class == 0]
  fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
  fig.suptitle(f'Counts of {column} by Class')
  ax1.hist(anomalies[column], bins = bins, color="red")
  ax1.set_title('Anomaly')
  ax2.hist(normal[column], bins = bins, color="orange")
  ax2.set_title('Normal')
  plt.xlabel(f'{column}')
  plt.ylabel('Count')
  if log_scale:
    plt.yscale('log')
  plt.xlim((np.min(df[column]), np.max(df[column])))
  plt.show()

In [0]:
def plot_scatter(df, x_col, y_col, sharey = False):
  anomalies = df[df.Class == 1]
  normal = df[df.Class == 0]
  fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True,
  sharey=sharey)
  fig.suptitle(f'{y_col} over {x_col} by Class')
  ax1.scatter(anomalies[x_col], anomalies[y_col], color='red')
  ax1.set_title('Anomaly')
  ax2.scatter(normal[x_col], normal[y_col], color='orange')
  ax2.set_title('Normal')
  plt.xlabel(x_col)
  plt.ylabel(y_col)
  plt.show()

Wykres rozrzutu wartości danych w dataframe obejmujący wszystkie wartości danych. Wykreślone kolumny to Amount na osi X i Class na osi Y.

In [0]:
plt.scatter(pandasdf.Amount, pandasdf.Class)
plt.title("Transaction Amounts by Class")
plt.ylabel("Class")
plt.yticks(range(2), ["Normal", "Anomaly"])
plt.xlabel("Transaction Amounts ($)")
plt.show()

Histogram zliczeń wartości danych zorganizowanych w interwały w kolumnie Kwota. Ilość bins wyniosła 100, co oznacza, że interwał każdego słupka na histogramie to zasięg danych w kolumnie Amount podzielona przez liczbę pojemników

In [0]:
bins = 100
plot_histogram(pandasdf, bins, "Amount", log_scale=True)

Wykres rozrzutu wartości w dataframe tabeli creditcard z danymi w kolumnie Time na osi X i dane w kolumnie Class w oś y

In [0]:
plt.scatter(pandasdf.Time, pandasdf.Class)
plt.title("Transactions over Time by Class")
plt.ylabel("Class")
plt.yticks(range(2), ["Normal", "Anomaly"])
plt.xlabel("Time (s)")
plt.show()

Wartości danych dla kolumny Time oś X i Amount oś Y w tabeli

In [0]:
plot_scatter(pandasdf, "Time", "Amount")

Wykres danych w kolumni V1

In [0]:
plot_histogram(pandasdf, bins, "V1")

Wykres wartości w kolumnach Amount na osi X oraz V1 na osi Y

Parametr sharey=True wymusza tą samą wartość na osi Y

In [0]:
plot_scatter(pandasdf, "Amount", "V1", sharey=True)

Wykres wartości w kolumnach Time na osi X oraz V1 na osi Y

In [0]:
plot_scatter(pandasdf, "Time", "V1", sharey=True)

Histogramy dla danych w każdej kolumnie od V1 do V28

In [0]:
for f in range(1, 29):
  print(f'V{f} Counts')
  plot_histogram(pandasdf, bins, f'V{f}')

Wykres dla kolumny V12 pokazuje odchylenie między anomaliami a normalnymi punktami danych. Chociaż znaczna część anomalii mieści się w zakresie normalnych punktów, nadal istnieje spora liczba anomalii, które wykraczają poza ten zakres. Można więc zobaczyć, że w porównaniu z czasem dane w kolumnie V12 również pokazują to odchylenie od normalnych punktów danych

Wykres dla kolumny V17 pokazuje odchylenie między anomaliami a normalnymi punktami danych. Podobnie jak w przypadku wartości dla V12, można zaobserwować kolejne odchylenie między normalnymi punktami a punktami podejrzanymi. W tym przypadku różnica wydaje się być nieco bardziej wyraźna, ponieważ anomalie wydają się być bardziej rozłożone niż dla V12

In [0]:
for f in range(1, 29):
  print(f'V{f} vs Time')
  plot_scatter(pandasdf, "Time", f'V{f}', sharey=True)

Patrząc na wykres rozrzutu dla kolumny V10, można zobaczyć wyraźne odchylenie fałszywych punktów od normalnych punktów. Jeśli chodzi o relację kolumn V do ilości, wydaje się, że więcej kolumn wykazuje zwiększone odchylenie w porównaniu z wcześniejszymi wykresami. Ta różnica nie jest tak duża, ponieważ nadal widać, że znaczna część anomalii występuje w normalnym klastrze danych. Jednak nadal daje to modelowi pewien kontekst, w jaki sposób oszukańcza transakcja różni się od zwykłej transakcji

In [0]:
Patrząc na wykres rozrzutu dla kolumny V12 po raz kolejny widać wyraźne odchylenie fałszywych punktów od normalnych punktów. W tym przypadku większość fałszywych punktów wydaje się odbiegać od normalnej grupy punktów. Można również zobaczyć, że istnieje pasmo normalnych punktów daleko od głównego klastra i że pasmo pokrywa się z anomalnymi punktami danych.

In [0]:
for f in range(1, 29):
  print(f'Amount vs V{f}')
  plot_scatter(pandasdf, f'V{f}', "Amount", sharey=True)