# Next steps:

- add the .rw.dat parsing to `DatFile` (with `raw` options in higher level classes where appropriate)
  - https://www.qdusa.com/siteDocs/appNotes/1500-022.pdf
- come up with an `Analysis` prototype/ABC
- implement `SimpleMvsHAnalysis` (getting things like `M_s`, `H_c`, `H_r`, etc. with appropriate scaling factors)
- implement basic serialization (i.e., `as_dict()` on all classes)
- create documentation and GitHub page
- add some basic plotting functionality

In [90]:
from __future__ import annotations
from dataclasses import dataclass, InitVar
import re
from collections import OrderedDict
import csv
from datetime import datetime
import numpy as np
import pandas as pd
from pathlib import Path
import magnetopy as mp
from magnetopy import DatFile, GenericFile
from magnetopy.data_files import filename_label
from magnetopy.parsing_utils import label_clusters, unique_values, find_outlier_indices, find_temp_turnaround_point, find_sequence_starts
from magnetopy import MvsH, ZFCFC, ZFC, FC, SampleInfo, Dataset
from magnetopy.experiments import _num_digits_after_decimal, _scale_dc_data, _add_uncorrected_moment_columns

import matplotlib.pyplot as plt

In [2]:
DATA_PATH = Path("data")

mvsh1_dat = DatFile(DATA_PATH / "mvsh1.dat")
mvsh2_dat = DatFile(DATA_PATH / "mvsh2.dat")
mvsh2a_dat = DatFile(DATA_PATH / "mvsh2a.dat")
mvsh2b_dat = DatFile(DATA_PATH / "mvsh2b.dat")
mvsh2c_dat = DatFile(DATA_PATH / "mvsh2c.dat")
mvsh3_dat = DatFile(DATA_PATH / "mvsh3.dat")
mvsh4_dat = DatFile(DATA_PATH / "mvsh4.dat")
mvsh5_dat = DatFile(DATA_PATH / "mvsh5.dat")
mvsh5rw_dat = DatFile(DATA_PATH / "mvsh5.rw.dat")
mvsh6_dat = DatFile(DATA_PATH / "mvsh6.dat")
mvsh7_dat = DatFile(DATA_PATH / "mvsh7.dat")
mvsh8_dat = DatFile(DATA_PATH / "mvsh8.dat")
mvsh9_dat = DatFile(DATA_PATH / "mvsh9.dat")
mvsh10_dat = DatFile(DATA_PATH / "mvsh10.dat")
mvsh11_dat = DatFile(DATA_PATH / "mvsh11.dat")
zfcfc1_dat = DatFile(DATA_PATH / "zfcfc1.dat")
zfcfc2_dat = DatFile(DATA_PATH / "zfcfc2.dat")
zfcfc3_dat = DatFile(DATA_PATH / "zfcfc3.dat")
zfcfc4_dat = DatFile(DATA_PATH / "zfcfc4.dat")
fc4a_dat = DatFile(DATA_PATH / "fc4a.dat")
fc4b_dat = DatFile(DATA_PATH / "fc4b.dat")
zfc4a_dat = DatFile(DATA_PATH / "zfc4a.dat")
zfc4b_dat = DatFile(DATA_PATH / "zfc4b.dat")
fc5_dat = DatFile(DATA_PATH / "fc5.dat")
fc5rw_dat = DatFile(DATA_PATH / "fc5.rw.dat")
zfc5_dat = DatFile(DATA_PATH / "zfc5.dat")
zfc5rw_dat = DatFile(DATA_PATH / "zfc5.rw.dat")
dataset4_dat = DatFile(DATA_PATH / "dataset4.dat")
pd_std1_dat = DatFile(DATA_PATH / "Pd_std1.dat")


In [3]:
commented_mvsh_dat = [mvsh4_dat, mvsh5_dat]
commented_mvsh_dat_w_dset = [mvsh4_dat, mvsh5_dat, dataset4_dat]
uncommented_mvsh_dat = [mvsh1_dat, mvsh2_dat, mvsh2a_dat, mvsh2b_dat, mvsh3_dat, mvsh6_dat, mvsh7_dat, mvsh8_dat, mvsh9_dat, mvsh10_dat, mvsh11_dat, pd_std1_dat]

In [4]:
mvsh1_2 = MvsH(mvsh1_dat, 2)
mvsh1_4 = MvsH(mvsh1_dat, 4)
mvsh1_6 = MvsH(mvsh1_dat, 6)
mvsh1_8 = MvsH(mvsh1_dat, 8)
mvsh1_10 = MvsH(mvsh1_dat, 10)
mvsh1_12 = MvsH(mvsh1_dat, 12)
mvsh1_300 = MvsH(mvsh1_dat, 300)

mvsh2_5 = MvsH(mvsh2_dat, 5)
mvsh2_300 = MvsH(mvsh2_dat, 300)
mvsh2a_5 = MvsH(mvsh2a_dat, 5)
mvsh2b_300 = MvsH(mvsh2b_dat, 300)
mvsh2c_5 = MvsH(mvsh2c_dat, 5)
mvsh2c_300 = MvsH(mvsh2c_dat, 300)

mvsh3_5 = MvsH(mvsh3_dat, 5)

mvsh4_293 = MvsH(mvsh4_dat, 293)

mvsh4ds_293 = MvsH(dataset4_dat, 293)

mvsh5_293 = MvsH(mvsh5_dat, 293)

mvsh6_300 = MvsH(mvsh6_dat, 300)

mvsh7_300 = MvsH(mvsh7_dat, 300)

mvsh8_2 = MvsH(mvsh8_dat, 2)

mvsh9_2 = MvsH(mvsh9_dat, 2)

mvsh10_5 = MvsH(mvsh10_dat, 5)

mvsh11_5 = MvsH(mvsh11_dat, 5)

pd_std1_300 = MvsH(pd_std1_dat, 300)

mvsh1 = [mvsh1_2, mvsh1_4, mvsh1_6, mvsh1_8, mvsh1_10, mvsh1_12, mvsh1_300]
mvsh2 = [mvsh2_5, mvsh2_300, mvsh2a_5, mvsh2b_300]
mvsh_commented = [mvsh4_293, mvsh4ds_293, mvsh5_293]
mvsh_rest = [mvsh3_5, mvsh6_300, mvsh7_300, mvsh8_2, mvsh9_2, pd_std1_300]

mvsh_uncommented = mvsh1 + mvsh2 + mvsh_rest

mvsh_all = mvsh1 + mvsh2 + mvsh_commented + mvsh_rest

In [5]:
uncommented_zfcfc_dat = [zfcfc1_dat, zfcfc2_dat, zfcfc3_dat]
uncommented_zfc_dat = [zfc5_dat]
uncommented_fc_dat = [fc5_dat]
commented_zfcfc_dat = [zfcfc4_dat]
commented_zfc_dat = [zfc4a_dat, zfc4b_dat]
commented_fc_dat = [fc4a_dat, fc4b_dat]


In [28]:
mvsh = DATA_PATH / "mvsh5.dat"
mvsh_raw = DATA_PATH / "mvsh5.rw.dat"

In [155]:
class ScanHeader:
    def __init__(self, direction: str, up_header: pd.Series) -> None:
        self.text: str = up_header["Comment"]
        self.direction = direction
        self.low_temp = self._get_value('low temp = (\d+\.\d+) K')
        self.high_temp = self._get_value('high temp = (\d+\.\d+) K')
        self.avg_temp = self._get_value('avg. temp = (\d+\.\d+) K')
        self.low_field = self._get_value('low field = (-?\d+\.\d+) Oe')
        self.high_field = self._get_value('high field = (-?\d+\.\d+) Oe')
        self.drift = self._get_value('drift = (-?\d+\.\d+) V/s')
        self.slope = self._get_value('slope = (-?\d+\.\d+) V/mm')
        self.squid_range = self._get_value('squid range = (\d+)')
        self.given_center = self._get_value('given center = (\d+\.\d+) mm')
        self.calculated_center = self._get_value('calculated center = (\d+\.\d+) mm')
        self.amp_fixed = self._get_value('amp fixed = (-?\d+\.\d+) V')
        self.amp_free = self._get_value('amp free =(-?\d+\.\d+) V')

    def _get_value(self, regex: str) -> float:
        return float(re.search(regex, self.text).group(1))
    
    def __repr__(self):
        avg_field = (self.low_field + self.high_field) / 2
        return f"ScanHeader({self.direction}, {avg_field:.2f} Oe, {self.avg_temp:.2f} K)"
    
    def __str__(self):
        avg_field = (self.low_field + self.high_field) / 2
        return f"{self.direction} scan at {avg_field:.2f} Oe, {self.avg_temp:2f} K"


class RawScan:
    def __init__(self, direction: str, scan: pd.DataFrame) -> None:
        self.direction = direction
        self.data = scan.copy()
        self.data.drop(
            columns = ["Comment", "Fixed C Fitted (V)", "Free C Fitted (V)"], inplace = True
        )
        self.data.reset_index(drop = True, inplace = True)
        self.start_time = self.data["Time Stamp (sec)"].iloc[0]

    def __repr__(self):
        return f"RawScan({self.direction} at {self.start_time} sec)"
    
    def __str__(self):
        return f"RawScan({self.direction} at {self.start_time} sec)"


class ProcessedScan:
    def __init__(self, scan: pd.DataFrame) -> None:
        self.data = scan.copy()
        self.data.drop(
            columns = ["Comment", "Raw Voltage (V)", "Processed Voltage (V)"], inplace = True
        )
        self.data.reset_index(drop = True, inplace = True)
        self.start_time = self.data["Time Stamp (sec)"].iloc[0]

    def __repr__(self):
        return f"ProcessedScan({self.start_time} sec)"
    
    def __str__(self):
        return f"ProcessedScan({self.start_time} sec)"

class DcMeasurement:
    def __init__(
        self,
        up_header: pd.Series,
        up_scan: pd.DataFrame,
        down_header: pd.Series,
        down_scan: pd.DataFrame,
        processed_scan: pd.DataFrame,
    ) -> None:
        self.up_header = ScanHeader("up", up_header)
        self.up_scan = RawScan("up", up_scan)
        self.down_header = ScanHeader("down", down_header)
        self.down_scan = RawScan("down", down_scan)
        self.processed_scan = ProcessedScan(processed_scan)

In [91]:
class DatFile(GenericFile):
    """A class for reading and storing data from a Quantum Design .dat file from a
    MPMS3 magnetometer.

    Attributes
    ----------
    local_path : Path
        The path to the .dat file.
    header : list[list[str]]
        The header of the .dat file.
    data : pd.DataFrame
        The data from the .dat file.
    comments : OrderedDict[str, list[str]]
        Any comments found within the "[Data]" section of the .dat file.
    length : int
        The length of the .dat file in bytes.
    sha512 : str
        The SHA512 hash of the .dat file.
    date_created : datetime
        The date and time the .dat file was created.
    experiments_in_file : list[str]
        The experiments contained in the .dat file. Can include "mvsh", "zfc", "fc",
        and/or "zfcfc".

    Methods
    -------
    as_dict()
        Serializes the DatFile object to a dictionary.
    """

    def __init__(self, file_path: str | Path, parse_raw: bool = False) -> None:
        super().__init__(file_path, "magnetometry")
        self.header = self._read_header()
        self.data = self._read_data()
        self.comments = self._get_comments()
        self.date_created = self._get_date_created()
        self.experiments_in_file = self._get_experiments_in_file()
        if parse_raw:
            rw_dat_file = self.local_path.parent / (self.local_path.stem + ".rw.dat")
            if rw_dat_file.exists():
                print(f"Found raw data file: {rw_dat_file}")

    def __str__(self) -> str:
        return f"DatFile({self.local_path.name})"

    def __repr__(self) -> str:
        return f"DatFile({self.local_path.name})"

    def _read_header(self, delimiter: str = "\t") -> list[list[str]]:
        header: list[list[str]] = []
        with self.local_path.open(encoding="utf-8") as f:
            reader = csv.reader(f, delimiter=delimiter)
            for row in reader:
                header.append(row)
                if row[0] == "[Data]":
                    break
        if len(header[2]) == 1:
            # some .dat files have a header that is delimited by commas
            header = self._read_header(delimiter=",")
        return header

    def _read_data(
        self,
        sep: str = "\t",
    ) -> pd.DataFrame:
        skip_rows = len(self.header)
        df = pd.read_csv(self.local_path, sep=sep, skiprows=skip_rows)
        if df.shape[1] == 1:
            # some .dat files have a header that is delimited by commas
            df = self._read_data(sep=",")
        return df

    def _get_comments(self) -> OrderedDict[str, list[str]]:
        comments = self.data["Comment"].dropna()
        comments = OrderedDict(comments)
        for key, value in comments.items():
            comments[key] = [comment.strip() for comment in value.split(",")]
        return comments

    def _get_date_created(self) -> datetime:
        for line in self.header:
            if line[0] == "FILEOPENTIME":
                day = line[2]
                hour = line[3]
                break
        hour24 = datetime.strptime(hour, "%I:%M %p")
        day = [int(x) for x in day.split("/")]
        return datetime(day[2], day[0], day[1], hour24.hour, hour24.minute)

    def _get_experiments_in_file(self) -> list[str]:
        experiments = []
        if self.comments:
            for comments in self.comments.values():
                for comment in comments:
                    if comment.lower() in ["mvsh", "zfc", "fc", "zfcfc"]:
                        experiments.append(comment.lower())
        elif (filename := filename_label(self.local_path.name, "", True)) != "unknown":
            experiments.append(filename)
        else:
            if len(self.data["Magnetic Field (Oe)"].unique()) == 1:
                experiments.append("zfcfc")
            else:
                experiments.append("mvsh")
        return experiments

    def as_dict(self) -> dict[str, Any]:
        """Serializes the DatFile object to a dictionary.

        Returns
        -------
        dict[str, Any]
            Contains the following keys: local_path, length, date_created, sha512,
            experiments_in_file.
        """
        return {
            "experiment_type": self.experiment_type,
            "local_path": str(self.local_path),
            "length": self.length,
            "date_created": self.date_created.isoformat(),
            "sha512": self.sha512,
            "experiments_in_file": self.experiments_in_file,
        }

In [197]:
mvsh = DatFile(DATA_PATH / "mvsh5.dat")
mvsh.data.head()

Unnamed: 0,Comment,Time Stamp (sec),Temperature (K),Magnetic Field (Oe),Moment (emu),M. Std. Err. (emu),Transport Action,Averaging Time (sec),Frequency (Hz),Peak Amplitude (mm),...,Map 07,Map 08,Map 09,Map 10,Map 11,Map 12,Map 13,Map 14,Map 15,Map 16
0,"MvsH, 20 C",3874607000.0,,,,,,,,,...,,,,,,,,,,
1,,3874608000.0,293.220718,-70000.039062,,,6.0,,,,...,,,,,,,,,,
2,,3874608000.0,293.201141,-65000.289062,,,6.0,,,,...,,,,,,,,,,
3,,3874608000.0,293.169449,-60000.429688,,,6.0,,,,...,,,,,,,,,,
4,,3874608000.0,293.225235,-55000.351562,,,6.0,,,,...,,,,,,,,,,


In [198]:
src1_comments = pd.DataFrame(
    {
        "Comment": ["some text", np.nan, np.nan, np.nan, "other text", np.nan, np.nan, np.nan],
        "col1": [np.nan, 1, 2, 3, np.nan, 4, 5, 6],
        "col2": [np.nan, 1, 2, 3, np.nan, 4, 5, 6],
    }
)
src1_comments

Unnamed: 0,Comment,col1,col2
0,some text,,
1,,1.0,1.0
2,,2.0,2.0
3,,3.0,3.0
4,other text,,
5,,4.0,4.0
6,,5.0,5.0
7,,6.0,6.0


In [199]:
src1_no_comments = pd.DataFrame(
    {
        "Comment": [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan],
        "col1": [1, 2, 3, 4, 5, 6],
        "col2": [1, 2, 3, 4, 5, 6],
    }
)
src1_no_comments

Unnamed: 0,Comment,col1,col2
0,,1,1
1,,2,2
2,,3,3
3,,4,4
4,,5,5
5,,6,6


In [200]:
src2 = pd.DataFrame({"col3":[1, 2, 3, 4, 5, 6]})
src2

Unnamed: 0,col3
0,1
1,2
2,3
3,4
4,5
5,6


In [240]:
has_comment = src1_comments["Comment"].notna()

In [245]:
has_comment[4]

True

In [249]:
def combine_dat_and_raw_dfs(dat: pd.DataFrame, raw: list[DcMeasurement]) -> pd.DataFrame:
    if len(dat) == len(raw):
        dat["raw_scan"] = raw
        return dat
    has_comment = dat["Comment"].notna()
    new_raw = []
    j = 0
    for i in range(len(dat)):
        if has_comment[i]:
            print(dat["Comment"][i])
            new_raw.append(np.nan)
            j += 1
        else:
            print(dat["Comment"][i])
            new_raw.append(raw[j])
            j += 1
    dat["raw_scan"] = new_raw
    return dat
        

In [250]:
combine_dat_and_raw_dfs(src1_comments, src2)

some text
nan


KeyError: 1

In [170]:
test['col3'] = col3

In [171]:
test

Unnamed: 0,comment,col1,col2,col3
0,some text,,,1.0
1,,1.0,1.0,2.0
2,,2.0,2.0,3.0
3,,3.0,3.0,4.0
4,other text,,,5.0
5,,4.0,4.0,6.0
6,,5.0,5.0,
7,,6.0,6.0,


In [30]:
dat = DatFile(mvsh, True)

Found raw data file: data/mvsh5.rw.dat


In [31]:
raw_dat = DatFile(mvsh_raw, True)

In [35]:
dat.data.shape

(230, 89)

In [54]:
header_idx = list(raw_dat.comments.keys())

In [78]:
up_header = raw_dat.data.iloc[header_idx[0]]
up_scan = raw_dat.data.iloc[header_idx[0] + 1 : header_idx[1]]
down_header = raw_dat.data.iloc[header_idx[1]]
down_scan = raw_dat.data.iloc[header_idx[1] + 1 : header_idx[1] + (header_idx[1] - header_idx[0])]
processed_scan = raw_dat.data.iloc[header_idx[1] + (header_idx[1] - header_idx[0]) : header_idx[2]]

In [143]:
meas = DcMeasurement(up_header, up_scan, down_header, down_scan, processed_scan)

In [146]:
meas.down_header.__dict__

{'text': ';low temp = 293.219696044922 K;high temp = 293.227996826172 K;avg. temp = 293.224720201994 K;low field = -70000.0390625 Oe;high field = -70000.0390625 Oe;drift = -0.0102812763361726 V/s;slope = 0.0011750030098483 V/mm;squid range = 10;given center = 34.2498902967861 mm;calculated center = 34.266242980957 mm;amp fixed = -4.51490592956543 V;amp free =-4.51291227340698 V',
 'direction': 'down',
 'low_temp': 293.219696044922,
 'high_temp': 293.227996826172,
 'avg_temp': 293.224720201994,
 'low_field': -70000.0390625,
 'high_field': -70000.0390625,
 'drift': -0.0102812763361726,
 'slope': 0.0011750030098483,
 'squid_range': 10.0,
 'given_center': 34.2498902967861,
 'calculated_center': 34.266242980957,
 'amp_fixed': -4.51490592956543,
 'amp_free': -4.51291227340698}

In [154]:
meas.processed_scan.data

Unnamed: 0,Time Stamp (sec),Raw Position (mm),Fixed C Fitted (V),Free C Fitted (V)
0,3.874608e+09,16.749889,-0.049361,-0.045461
1,3.874608e+09,16.924891,-0.050713,-0.046659
2,3.874608e+09,17.099890,-0.052112,-0.047901
3,3.874608e+09,17.274891,-0.053557,-0.049187
4,3.874608e+09,17.449890,-0.055051,-0.050519
...,...,...,...,...
195,3.874608e+09,50.874889,-0.055051,-0.063560
196,3.874608e+09,51.049889,-0.053557,-0.061816
197,3.874608e+09,51.224888,-0.052112,-0.060123
198,3.874608e+09,51.399891,-0.050713,-0.058481


In [121]:
up_header_obj = ScanHeader(up_header)
up_header_obj.__dict__

{'text': ';low temp = 293.219696044922 K;high temp = 293.227996826172 K;avg. temp = 293.224720201994 K;low field = -70000.0390625 Oe;high field = -70000.0390625 Oe;drift = -0.0102812763361726 V/s;slope = -0.0011750030098483 V/mm;squid range = 10;given center = 34.2498902967861 mm;calculated center = 34.266242980957 mm;amp fixed = -4.51490592956543 V;amp free =-4.51291227340698 V',
 'low_temp': 293.219696044922,
 'high_temp': 293.227996826172,
 'avg_temp': 293.224720201994,
 'low_field': -70000.0390625,
 'high_field': -70000.0390625,
 'drift': -0.0102812763361726,
 'slope': -0.0011750030098483,
 'squid_range': 10.0,
 'given_center': 34.2498902967861,
 'calculated_center': 34.266242980957,
 'amp_fixed': -4.51490592956543,
 'amp_free': -4.51291227340698}

In [122]:
down_header_obj = ScanHeader(down_header)
down_header_obj.__dict__

{'text': ';low temp = 293.219696044922 K;high temp = 293.227996826172 K;avg. temp = 293.224720201994 K;low field = -70000.0390625 Oe;high field = -70000.0390625 Oe;drift = -0.0102812763361726 V/s;slope = 0.0011750030098483 V/mm;squid range = 10;given center = 34.2498902967861 mm;calculated center = 34.266242980957 mm;amp fixed = -4.51490592956543 V;amp free =-4.51291227340698 V',
 'low_temp': 293.219696044922,
 'high_temp': 293.227996826172,
 'avg_temp': 293.224720201994,
 'low_field': -70000.0390625,
 'high_field': -70000.0390625,
 'drift': -0.0102812763361726,
 'slope': 0.0011750030098483,
 'squid_range': 10.0,
 'given_center': 34.2498902967861,
 'calculated_center': 34.266242980957,
 'amp_fixed': -4.51490592956543,
 'amp_free': -4.51291227340698}