In [13]:
%pip install --upgrade dask[complete] numpy pandas
import dask.dataframe as dd




Note: you may need to restart the kernel to use updated packages.


In [14]:
df = dd.read_csv(r"../data/*.csv",encoding='utf-8', sample=1000000, blocksize='64mb')

In [15]:
print(df.shape[0].compute(), df.shape[1])

  path_info,


ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+--------------+---------+----------+
| Column       | Found   | Expected |
+--------------+---------+----------+
| act          | float64 | int64    |
| bailable_ipc | object  | float64  |
+--------------+---------+----------+

The following columns also raised exceptions on conversion:

- bailable_ipc
  ValueError("could not convert string to float: 'bailable'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'act': 'float64',
       'bailable_ipc': 'object'}

to the call to `read_csv`/`read_table`.

In [None]:
print(df)
df.shape
len(df)
df.head()

Dask DataFrame Structure:
               ddl_case_id    act  section bailable_ipc number_sections_ipc criminal
npartitions=64                                                                      
                    object  int64  float64      float64             float64    int64
                       ...    ...      ...          ...                 ...      ...
...                    ...    ...      ...          ...                 ...      ...
                       ...    ...      ...          ...                 ...      ...
                       ...    ...      ...          ...                 ...      ...
Dask Name: read-csv, 64 tasks


  path_info,


ParserError: Error tokenizing data. C error: Expected 6 fields in line 74, saw 7


In [16]:
import dask.datasets as ds
from dask import delayed, compute
from pathlib import Path  # For handling file paths
import yaml  # For reading and parsing YAML files


In [None]:

# class UnifiedReader:
#     def __init__(self):
#         self.readers = {
#             '.json': self.read_json,
#             '.yaml': self.read_yaml,
#             '.yml': self.read_yaml,
#             '.txt': self.read_text
#         }

#     @delayed
#     def read_json(self, file_path):
#         return dd.read_json(file_path, orient='records', lines=True)

#     @delayed
#     def read_yaml(self, file_path):
#         with open(file_path, 'r') as file:
#             data = yaml.safe_load(file)
#             return data

#     @delayed
#     def read_text(self, file_path):
#         with open(file_path, 'r', encoding='utf-8') as file:
#             return file.readlines()

#     def read_file(self, file_path: str):
#         ext = Path(file_path).suffix
#         if ext not in self.readers:
#             raise ValueError(f"Unsupported file type: {ext}")
#         return self.readers[ext](file_path)

#     def read_directory(self, dir_path: str):
#         files = list(Path(dir_path).rglob('*'))
#         tasks = [self.read_file(str(file_path)) for file_path in files if file_path.suffix in self.readers]
#         return compute(*tasks)

# def main():
#     reader = UnifiedReader()
#     data_dir = 'data'
#     csv_data = reader.read_directory('../data/*.csv')
#     json_data = reader.read_directory('../data/*.json')
#     yaml_data = reader.read_directory('../data/*.yaml')
#     txt_data = reader.read_directory('../data/*.txt')



#     print("Data from all files:")
#     print(csv_data)

# if __name__ == "__main__":
#     main()


Data from all files:
()


In [24]:
import dask.dataframe as dd
import yaml
from pathlib import Path
from dask import delayed, compute
import glob
import pandas as pd
import json


In [28]:

class UnifiedReader:
    def __init__(self):
        self.readers = {
            '.json': self.read_json,
            '.yaml': self.read_yaml,
            '.yml': self.read_yaml,
            '.txt': self.read_text,
            '.csv': self.read_csv  # Added CSV reader
        }


    @delayed
    def read_yaml(self, file_path):
        with open(file_path, 'r') as file:
            data = yaml.safe_load(file)
            return data

    @delayed
    def read_text(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.readlines()

    @delayed
    def read_csv(self, file_path):
        f = dd.read_csv(r"../data/*.csv",encoding='utf-8', sample=1000000, blocksize='64mb')

    def read_file(self, file_path: str):
        ext = Path(file_path).suffix
        if ext not in self.readers:
            raise ValueError(f"Unsupported file type: {ext}")
        return self.readers[ext](file_path)

    def read_directory(self, glob_pattern: str):
        files = glob.glob(glob_pattern)
        tasks = [self.read_file(file_path) for file_path in files]
        return compute(*tasks)

    @delayed
    def read_json(self, file_path):
        try:
            # Try reading as line-delimited JSON
            df = dd.read_json(file_path, orient='records', lines=True, blocksize='64mb')
            return df.map_partitions(pd.json_normalize)
        except ValueError:
            # If that fails, try reading the entire file as a single JSON object
            with open(file_path, 'r') as f:
                data = json.load(f)
            if isinstance(data, list):
                return dd.from_pandas(pd.json_normalize(data), npartitions=1)
            else:
                return dd.from_pandas(pd.json_normalize([data]), npartitions=1)


def main():
    reader = UnifiedReader()
    data_dir = '../data'
    
    csv_data = reader.read_directory(f'{data_dir}/*.csv')
    json_data = reader.read_directory(f'{data_dir}/*.json')
    yaml_data = reader.read_directory(f'{data_dir}/*.yaml')
    txt_data = reader.read_directory(f'{data_dir}/*.txt')

    print("CSV Data:")
    print(csv_data)
    print("\nJSON Data:")
    print(json_data)
    print("\nYAML Data:")
    print(yaml_data)
    print("\nTXT Data:")
    print(txt_data)

if __name__ == "__main__":
    main()

CSV Data:
(None, None, None)

JSON Data:
(Dask DataFrame Structure:
              Act Title  Act ID Enactment Date Act Definition.0 Act Definition.1 Chapters.0.ID Chapters.0.Name Chapters.0.Sections.Section 1..heading Chapters.0.Sections.Section 1..paragraphs.0 Chapters.0.Sections.Section 1..paragraphs.1 Chapters.0.Sections.Section 1..paragraphs.2 Chapters.0.Sections.Section 2..heading Chapters.0.Sections.Section 2..paragraphs.0.text Chapters.0.Sections.Section 2..paragraphs.0.contains.0 Chapters.0.Sections.Section 2..paragraphs.0.contains.1.text Chapters.0.Sections.Section 2..paragraphs.0.contains.1.contains.0 Chapters.0.Sections.Section 2..paragraphs.0.contains.1.contains.1 Chapters.0.Sections.Section 2..paragraphs.0.contains.2 Chapters.0.Sections.Section 2..paragraphs.0.contains.3 Chapters.0.Sections.Section 2..paragraphs.0.contains.4 Chapters.1.ID Chapters.1.Name Chapters.1.Subheadings Chapters.2.ID Chapters.2.Name Chapters.2.Subheadings Chapters.3.ID Chapters.3.Name Chapters.3.Sub

In [30]:
import dask.dataframe as dd
import dask.bag as db
import pandas as pd
import yaml
import json
from pathlib import Path

def robust_text_reader(file_path, file_type=None, **kwargs):
    """
    A robust function to read various text file formats using Dask.
    
    Parameters:
    - file_path: str or list, path(s) to the file(s)
    - file_type: str, optional. If not provided, will be inferred from file extension
    - **kwargs: Additional arguments to pass to the specific reader function
    
    Returns:
    - Dask DataFrame or Dask Bag, depending on the file type and structure
    """
    if isinstance(file_path, list):
        file_path = [str(Path(f)) for f in file_path]
    else:
        file_path = str(Path(file_path))
    
    if not file_type:
        file_type = Path(file_path).suffix.lower()[1:]  # Remove the leading dot
    
    # Default parameters
    encoding = kwargs.get('encoding', 'utf-8')
    blocksize = kwargs.get('blocksize', '64MB')
    
    try:
        if file_type in ['csv', 'tsv', 'txt']:
            # Assume structured text file
            sep = kwargs.get('sep', ',' if file_type == 'csv' else '\t')
            return dd.read_csv(file_path, sep=sep, encoding=encoding, blocksize=blocksize, **kwargs)
        
        elif file_type == 'json':
            lines = kwargs.get('lines', True)
            return dd.read_json(file_path, encoding=encoding, blocksize=blocksize, lines=lines, **kwargs)
        
        elif file_type == 'yaml':
            def read_yaml(file):
                with open(file, 'r', encoding=encoding) as f:
                    return yaml.safe_load(f)
            return db.from_delayed(db.delayed(read_yaml)(file_path))
        
        else:
            # Unstructured text or unknown format
            text_bag = db.read_text(file_path, encoding=encoding)
            
            # Try to infer structure
            sample = text_bag.take(1)[0]
            try:
                # Check if it's JSON
                json.loads(sample)
                return dd.read_json(file_path, encoding=encoding, blocksize=blocksize, lines=True)
            except json.JSONDecodeError:
                # Check if it's CSV-like
                if ',' in sample or '\t' in sample:
                    sep = ',' if ',' in sample else '\t'
                    return dd.read_csv(file_path, sep=sep, encoding=encoding, blocksize=blocksize)
                else:
                    # Return as raw text bag if no structure is inferred
                    return text_bag
    
    except Exception as e:
        print(f"Error reading file: {e}")
        # Fallback to pandas for smaller files or debugging
        try:
            return dd.from_pandas(pd.read_csv(file_path, **kwargs), npartitions=1)
        except Exception:
            return db.read_text(file_path, encoding=encoding)

robust_text_reader('../data/*.txt')




Unnamed: 0_level_0,Chander Mohan Negi & Ors. Vs. State of Himachal Pradesh & Ors.Â
npartitions=13,Unnamed: 1_level_1
,object
,...
...,...
,...
,...
